You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@esme.apache.org by AJ Prudhomme <aj...@atlas-title.com> on 2012/08/20 13:50:08 UTC

Re: svn commit: r1374973 - in /esme/branches/akka: ./ src/main/scala/bootstrap/liftweb/ src/main/scala/org/apache/esme/actor/ src/main/scala/org/apache/esme/lib/ src/main/scala/org/apache/esme/model/

Remove!

Sent from my Verizon Wireless Phone

lester@apache.org wrote:

>Author: lester
>Date: Mon Aug 20 11:16:34 2012
>New Revision: 1374973
>
>URL: http://svn.apache.org/viewvc?rev=1374973&view=rev
>Log:
>In scope of ESME-360: Began work  on XMPP Consumer component action.
>
>Added:
>    esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
>    esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
>Modified:
>    esme/branches/akka/build.sbt
>    esme/branches/akka/pom.xml
>    esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
>    esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
>    esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
>    esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
>
>Modified: esme/branches/akka/build.sbt
>URL: http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/build.sbt (original)
>+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012
>@@ -8,7 +8,7 @@ version := "1.4"
> 
> organization := "Apache Software Foundation"
> 
>-scalaVersion := "2.9.1"    
>+scalaVersion := "2.9.1"
> 
> //scalacOptions ++= Seq("-unchecked", "-deprecation")  
> scalacOptions ++= Seq("-deprecation") 
>@@ -28,7 +28,7 @@ libraryDependencies ++= {
>   val compassVersion = "2.1.1"
>   val luceneVersion = "2.4.0"
>   val scalazVersion = "6.0.4"
>-  val akkaVersion = "2.0.2"
>+  val akkaVersion = "2.1-20120701-002745"
>   val eclipsejettyVersion = "7.3.1.v20110307"
>   val mortbayjettyVersion = "6.1.22"
>   val slf4jVersion = "1.6.4" 
>@@ -47,7 +47,7 @@ libraryDependencies ++= {
>     "net.liftweb" %% "lift-textile" % liftVersion % "compile->default",
>     "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
>     "com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default",
>-    "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" % "compile->default",
>+    "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % "compile->default",
>     "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
>     "javax.servlet" % "servlet-api" % "2.5" % "provided->default",
>     "org.compass-project" % "compass" % compassVersion % "compile->default",
>
>Modified: esme/branches/akka/pom.xml
>URL: http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/pom.xml (original)
>+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012
>@@ -76,7 +76,7 @@
>         <lift.version>2.4</lift.version>
>         <scala.version>2.9.1</scala.version>
>         <scalaz.version>6.0.4</scalaz.version>
>-        <akka.version>2.1-SNAPSHOT</akka.version>
>+        <akka.version>2.1-20120701-002745</akka.version>
>         <compass.version>2.1.1</compass.version>
>         <lucene.version>2.4.0</lucene.version>
>         <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server>
>@@ -91,17 +91,6 @@
>             <url>http://repo2.maven.org/maven2/</url>
>         </repository>
>         <repository>
>-            <id>scala-tools.org</id>
>-            <name>Scala-Tools Maven2 Repository</name>
>-            <url>http://scala-tools.org/repo-releases</url>
>-        </repository>
>-        <repository>
>-            <id>scala-tools.org.snapshots</id>
>-            <name>Scala-Tools Maven2 Repository for Snapshots</name>
>-            <url>http://scala-tools.org/repo-snapshots</url>
>-            <snapshots/>
>-        </repository>
>-        <repository>
>             <id>typesafe</id>
>             <name>Typesafe Repository Releases</name>
>             <url>http://repo.typesafe.com/typesafe/releases</url>
>@@ -135,9 +124,9 @@
> 
>     <pluginRepositories>
>         <pluginRepository>
>-            <id>scala-tools.org</id>
>-            <name>Scala-Tools Maven2 Repository</name>
>-            <url>http://scala-tools.org/repo-releases</url>
>+            <id>typesafe</id>
>+            <name>Typesafe Repository Releases</name>
>+            <url>http://repo.typesafe.com/typesafe/releases</url>
>         </pluginRepository>
>     </pluginRepositories>
> 
>@@ -232,7 +221,7 @@
>         <dependency>
>             <groupId>com.typesafe.akka</groupId>
>             <artifactId>akka-camel</artifactId>
>-            <version>2.1-SNAPSHOT</version>
>+            <version>${akka.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.camel</groupId>
>
>Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala (original)
>+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon Aug 20 11:16:34 2012
>@@ -27,6 +27,8 @@ import net.liftweb.http.auth._
> import net.liftweb.sitemap._
> import net.liftweb.sitemap.Loc._
> import Helpers._
>+import akka.actor.{Props => AkkaProps, ActorSystem}
>+
> //import TimeHelpers.intToTimeSpanBuilder
> //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier, DefaultConnectionIdentifier, ConnectionIdentifier}
> import java.sql.{Connection, DriverManager}
>@@ -249,6 +251,8 @@ class Boot extends Loggable {
>     ConvDistributor.touch
>     // ScalaInterpreter.touch
> 
>+    val sys = ActorSystem("camel")
>+    val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
> 
>     // Initiating popular links and resent messages
>     val resentPeriod = Props.getLong("stats.resent.period", 1 week)
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala (original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala Mon Aug 20 11:16:34 2012
>@@ -64,8 +64,8 @@ object UserActor {
>   val xmppUsr = Props.get("xmpp.user") openOr ""
>   val xmppPwd = Props.get("xmpp.password") openOr ""
>   val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
>-  lazy val sys = ActorSystem("camel")
>-  lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)))
>+  val sys = ActorSystem("camel")
>+  val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
> }
> 
> 
>@@ -289,8 +289,14 @@ class UserActor extends LiftActor {
>               Distributor !
>               Distributor.AddMessageToMailbox(id, msg, ResendReason(userId))
> 
>-            case FetchFeed(_, _) => 
>-              MessagePullActor ! MessagePullActor.Fetch(td.performId)    
>+            case XmppFrom(_) => {
>+              val sys = ActorSystem("camel")
>+              sys.actorFor("XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
>+            }
>+
>+
>+            case FetchFeed(_, _) =>
>+              MessagePullActor ! MessagePullActor.Fetch(td.performId)
> 
>             case ScalaInterpret => logger.info("Scala interpreter is disabled!")
>             /*if (msg.source.is != "scala")
>
>Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala (added)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala Mon Aug 20 11:16:34 2012
>@@ -0,0 +1,51 @@
>+package org.apache.esme.actor
>+
>+import akka.camel.{CamelMessage, Consumer}
>+
>+import net.liftweb.common.{Empty, Logger}
>+import collection.immutable.Queue
>+import org.apache.esme.actor.Distributor.UserCreatedMessage
>+import org.apache.esme.model.User
>+
>+/**
>+ * Created with IntelliJ IDEA.
>+ * User: lester
>+ * Date: 17.08.12
>+ * Time: 2:26
>+ */
>+
>+object XmppReceiver {
>+  val logger: Logger = Logger("org.apache.esme.actor")
>+  case class FetchMessages()
>+}
>+
>+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String, esmePwd: String, xmppServiceName: String, participant: String, user: User) extends Consumer {
>+
>+  import XmppReceiver._
>+
>+  var messages: List[(String, Long)] = List.empty
>+
>+  def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: %s".format(uri)); uri}
>+
>+  def receive = {
>+    case msg: CamelMessage => {
>+      messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages
>+    }
>+    case FetchMessages => {
>+      messages.foreach(message =>
>+        Distributor ! UserCreatedMessage(
>+          if (user != null) {user.id} else 0,
>+          message._1,
>+          Nil,
>+          message._2,
>+          Empty,
>+          participant,
>+          Empty,
>+          None
>+        )
>+      )
>+      messages = List.empty
>+    }
>+    case _ => logger.error("Incoming message is not Camel Message!")
>+  }
>+}
>\ No newline at end of file
>
>Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala (added)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala Mon Aug 20 11:16:34 2012
>@@ -0,0 +1,66 @@
>+package org.apache.esme.actor
>+
>+import akka.actor.{ActorRef, Actor, Props => AkkaProps}
>+import net.liftweb.util.Props
>+import org.apache.esme.actor.XmppReceiver.FetchMessages
>+import org.apache.esme.model.User
>+import net.liftweb.common.Logger
>+
>+/**
>+ * Created with IntelliJ IDEA.
>+ * User: lester
>+ * Date: 17.08.12
>+ * Time: 4:21
>+ */
>+
>+object XmppSupervisor {
>+  val logger: Logger = Logger("org.apache.esme.actor")
>+
>+
>+  sealed trait XmppSupervisorActions
>+  case class Fetch(id: Long) extends XmppSupervisorActions
>+  case class Start(id: Long, who: String, usr: User) extends XmppSupervisorActions
>+  case class Stop(id: Long) extends XmppSupervisorActions
>+}
>+
>+class XmppSupervisor extends Actor {
>+
>+  import XmppSupervisor._
>+
>+  private var xmppPullActors: Map[Long, ActorRef] = Map.empty
>+
>+  var xmppHost: String = _
>+  var xmppPort: String = _
>+  var xmppUsr: String = _
>+  var xmppPwd: String = _
>+  var xmppServiceName: String = _
>+
>+
>+  override def preStart() {
>+    logger.info("preStart() called")
>+
>+    xmppHost = Props.get("xmpp.host") openOr ""
>+    xmppPort = Props.get("xmpp.port") openOr ""
>+    xmppUsr = Props.get("xmpp.user") openOr ""
>+    xmppPwd = Props.get("xmpp.password") openOr ""
>+    xmppServiceName = Props.get("xmpp.serviceName") openOr ""
>+  }
>+
>+  def receive = {
>+    case Start(id, who, usr) => {
>+      logger.info("Start message received. User: %s, who: %s".format(usr, who))
>+      xmppPullActors += (id -> context.actorOf(AkkaProps(new XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, usr))))
>+    }
>+    case Stop(id) => {
>+      xmppPullActors.get(id).foreach { ref =>
>+        context.stop(ref)
>+        xmppPullActors -= id
>+      }
>+    }
>+    case Fetch(id) => {
>+      logger.info("Fetch message received")
>+      xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
>+    }
>+    case _ => logger.info("Unknown message received")
>+  }
>+}
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala (original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala Mon Aug 20 11:16:34 2012
>@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers(
>   lazy val password: Parser[String] = user
> 
>   lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr
>-  lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
>+  lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
>+  lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr
> 
>   lazy val emailAddr: Parser[String] = rep1(xchar) ^^ {
>     case xs => xs.mkString
>@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers(
>   (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>     case mt ~ text => MailTo(mt, text.map(_ mkString))
>   }) |
>-  (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>+  (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>     case mt ~ text => XmppTo(mt, text.map(_ mkString))
>   }) |
>+  (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>+    case mt ~ text => XmppFrom(mt)
>+  }) |
>   (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ {
>-      case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
>-        HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
>-    }) |
>+    case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
>+      HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
>+  }) |
>   (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ {
>     case url ~ tags => FetchAtom(UrlStore.make(url), tags)
>   }) |
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala (original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala Mon Aug 20 11:16:34 2012
>@@ -34,7 +34,15 @@ import java.util.Calendar
> import java.util.Date
> import scala.xml.{Text, Node, Elem => XmlElem}
> 
>-object Action extends Action with LongKeyedMetaMapper[Action] {
>+import akka.actor.{Props => AkkaProps, ActorSystem}
>+
>+object Action extends Action with LongKeyedMetaMapper[Action] with Logger {
>+
>+  val logger: Logger = Logger("org.apache.esme.model")
>+  val sys = ActorSystem("camel")
>+  val xmppSupervisor = sys.actorFor("XmppSupervisor")
>+
>+
>   override def afterCommit = notifyDistributor _ :: super.afterCommit
> 
>   private def notifyDistributor(in: Action) {
>@@ -58,6 +66,7 @@ object Action extends Action with LongKe
>     } else {
>       SchedulerActor ! SchedulerActor.StopRegular(in.id)
>       MessagePullActor ! MessagePullActor.StopPullActor(in.id)
>+      xmppSupervisor ! XmppSupervisor.Stop(in.id)
>     }
>   }
>   
>@@ -185,6 +194,9 @@ object Action extends Action with LongKe
>  */
> class Action extends LongKeyedMapper[Action] {
> 
>+  import Action.xmppSupervisor
>+  import Action.logger
>+
>   /**
>    * Actors related to regularly executed actions are started here
>    * This is done when the action is activated or at the start of the application
>@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act
>               case FetchRss(_, _) => new RssFeed(u, url.url, urlSourcePrefix + url.uniqueId, 0, tags)
>             }
>             MessagePullActor ! MessagePullActor.StartPullActor(id.is, lastMsg, feed)
>-          
>+
>+          case _ =>
>+        }
>+      }
>+      case XmppFrom(who) => {
>+        User.find(user) match {
>+          case Full(u) =>
>+            xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
>           case _ =>
>         }
>       }
>@@ -569,6 +588,7 @@ sealed trait Performances
> case class MailTo(who: String, text: Option[String]) extends Performances
> case class HttpTo(url: String, user: String, password: String, headers: List[(String, String)], data: Option[String]) extends Performances
> case class XmppTo(who: String, text: Option[String]) extends Performances
>+case class XmppFrom(who: String) extends Performances
> case class FetchAtom(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
> case class FetchRss(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
> case object PerformResend extends Performances
>
>

Re: svn commit: r1374973 - in /esme/branches/akka: ./ src/main/scala/bootstrap/liftweb/ src/main/scala/org/apache/esme/actor/ src/main/scala/org/apache/esme/lib/ src/main/scala/org/apache/esme/model/

Posted by Vladimir Ivanov <le...@gmail.com>.
Done.

Sorry, comments were inserted due to recent upgrade of my IDE.

Best Regards,
Vladimir

2012/8/20 AJ Prudhomme <aj...@atlas-title.com>

> Remove!
>
> Sent from my Verizon Wireless Phone
>
> lester@apache.org wrote:
>
> >Author: lester
> >Date: Mon Aug 20 11:16:34 2012
> >New Revision: 1374973
> >
> >URL: http://svn.apache.org/viewvc?rev=1374973&view=rev
> >Log:
> >In scope of ESME-360: Began work  on XMPP Consumer component action.
> >
> >Added:
> >
>  esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> >
>  esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> >Modified:
> >    esme/branches/akka/build.sbt
> >    esme/branches/akka/pom.xml
> >    esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
> >
>  esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
> >    esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> >    esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> >
> >Modified: esme/branches/akka/build.sbt
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/build.sbt (original)
> >+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012
> >@@ -8,7 +8,7 @@ version := "1.4"
> >
> > organization := "Apache Software Foundation"
> >
> >-scalaVersion := "2.9.1"
> >+scalaVersion := "2.9.1"
> >
> > //scalacOptions ++= Seq("-unchecked", "-deprecation")
> > scalacOptions ++= Seq("-deprecation")
> >@@ -28,7 +28,7 @@ libraryDependencies ++= {
> >   val compassVersion = "2.1.1"
> >   val luceneVersion = "2.4.0"
> >   val scalazVersion = "6.0.4"
> >-  val akkaVersion = "2.0.2"
> >+  val akkaVersion = "2.1-20120701-002745"
> >   val eclipsejettyVersion = "7.3.1.v20110307"
> >   val mortbayjettyVersion = "6.1.22"
> >   val slf4jVersion = "1.6.4"
> >@@ -47,7 +47,7 @@ libraryDependencies ++= {
> >     "net.liftweb" %% "lift-textile" % liftVersion % "compile->default",
> >     "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
> >     "com.typesafe.akka" % "akka-actor" % akkaVersion %
> "compile->default",
> >-    "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" %
> "compile->default",
> >+    "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" %
> "compile->default",
> >     "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
> >     "javax.servlet" % "servlet-api" % "2.5" % "provided->default",
> >     "org.compass-project" % "compass" % compassVersion %
> "compile->default",
> >
> >Modified: esme/branches/akka/pom.xml
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/pom.xml (original)
> >+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012
> >@@ -76,7 +76,7 @@
> >         <lift.version>2.4</lift.version>
> >         <scala.version>2.9.1</scala.version>
> >         <scalaz.version>6.0.4</scalaz.version>
> >-        <akka.version>2.1-SNAPSHOT</akka.version>
> >+        <akka.version>2.1-20120701-002745</akka.version>
> >         <compass.version>2.1.1</compass.version>
> >         <lucene.version>2.4.0</lucene.version>
> >         <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server>
> >@@ -91,17 +91,6 @@
> >             <url>http://repo2.maven.org/maven2/</url>
> >         </repository>
> >         <repository>
> >-            <id>scala-tools.org</id>
> >-            <name>Scala-Tools Maven2 Repository</name>
> >-            <url>http://scala-tools.org/repo-releases</url>
> >-        </repository>
> >-        <repository>
> >-            <id>scala-tools.org.snapshots</id>
> >-            <name>Scala-Tools Maven2 Repository for Snapshots</name>
> >-            <url>http://scala-tools.org/repo-snapshots</url>
> >-            <snapshots/>
> >-        </repository>
> >-        <repository>
> >             <id>typesafe</id>
> >             <name>Typesafe Repository Releases</name>
> >             <url>http://repo.typesafe.com/typesafe/releases</url>
> >@@ -135,9 +124,9 @@
> >
> >     <pluginRepositories>
> >         <pluginRepository>
> >-            <id>scala-tools.org</id>
> >-            <name>Scala-Tools Maven2 Repository</name>
> >-            <url>http://scala-tools.org/repo-releases</url>
> >+            <id>typesafe</id>
> >+            <name>Typesafe Repository Releases</name>
> >+            <url>http://repo.typesafe.com/typesafe/releases</url>
> >         </pluginRepository>
> >     </pluginRepositories>
> >
> >@@ -232,7 +221,7 @@
> >         <dependency>
> >             <groupId>com.typesafe.akka</groupId>
> >             <artifactId>akka-camel</artifactId>
> >-            <version>2.1-SNAPSHOT</version>
> >+            <version>${akka.version}</version>
> >         </dependency>
> >         <dependency>
> >             <groupId>org.apache.camel</groupId>
> >
> >Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
> (original)
> >+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon
> Aug 20 11:16:34 2012
> >@@ -27,6 +27,8 @@ import net.liftweb.http.auth._
> > import net.liftweb.sitemap._
> > import net.liftweb.sitemap.Loc._
> > import Helpers._
> >+import akka.actor.{Props => AkkaProps, ActorSystem}
> >+
> > //import TimeHelpers.intToTimeSpanBuilder
> > //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier,
> DefaultConnectionIdentifier, ConnectionIdentifier}
> > import java.sql.{Connection, DriverManager}
> >@@ -249,6 +251,8 @@ class Boot extends Loggable {
> >     ConvDistributor.touch
> >     // ScalaInterpreter.touch
> >
> >+    val sys = ActorSystem("camel")
> >+    val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()),
> "XmppSupervisor")
> >
> >     // Initiating popular links and resent messages
> >     val resentPeriod = Props.getLong("stats.resent.period", 1 week)
> >
> >Modified:
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >---
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
> (original)
> >+++
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala Mon
> Aug 20 11:16:34 2012
> >@@ -64,8 +64,8 @@ object UserActor {
> >   val xmppUsr = Props.get("xmpp.user") openOr ""
> >   val xmppPwd = Props.get("xmpp.password") openOr ""
> >   val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
> >-  lazy val sys = ActorSystem("camel")
> >-  lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost,
> xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)))
> >+  val sys = ActorSystem("camel")
> >+  val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost,
> xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
> > }
> >
> >
> >@@ -289,8 +289,14 @@ class UserActor extends LiftActor {
> >               Distributor !
> >               Distributor.AddMessageToMailbox(id, msg,
> ResendReason(userId))
> >
> >-            case FetchFeed(_, _) =>
> >-              MessagePullActor ! MessagePullActor.Fetch(td.performId)
> >+            case XmppFrom(_) => {
> >+              val sys = ActorSystem("camel")
> >+              sys.actorFor("XmppSupervisor") !
> XmppSupervisor.Fetch(td.performId)
> >+            }
> >+
> >+
> >+            case FetchFeed(_, _) =>
> >+              MessagePullActor ! MessagePullActor.Fetch(td.performId)
> >
> >             case ScalaInterpret => logger.info("Scala interpreter is
> disabled!")
> >             /*if (msg.source.is != "scala")
> >
> >Added:
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto
>
> >==============================================================================
> >---
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> (added)
> >+++
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> Mon Aug 20 11:16:34 2012
> >@@ -0,0 +1,51 @@
> >+package org.apache.esme.actor
> >+
> >+import akka.camel.{CamelMessage, Consumer}
> >+
> >+import net.liftweb.common.{Empty, Logger}
> >+import collection.immutable.Queue
> >+import org.apache.esme.actor.Distributor.UserCreatedMessage
> >+import org.apache.esme.model.User
> >+
> >+/**
> >+ * Created with IntelliJ IDEA.
> >+ * User: lester
> >+ * Date: 17.08.12
> >+ * Time: 2:26
> >+ */
> >+
> >+object XmppReceiver {
> >+  val logger: Logger = Logger("org.apache.esme.actor")
> >+  case class FetchMessages()
> >+}
> >+
> >+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String,
> esmePwd: String, xmppServiceName: String, participant: String, user: User)
> extends Consumer {
> >+
> >+  import XmppReceiver._
> >+
> >+  var messages: List[(String, Long)] = List.empty
> >+
> >+  def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format
> (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI
> is: %s".format(uri)); uri}
> >+
> >+  def receive = {
> >+    case msg: CamelMessage => {
> >+      messages = (msg.bodyAs[String], System.currentTimeMillis) ::
> messages
> >+    }
> >+    case FetchMessages => {
> >+      messages.foreach(message =>
> >+        Distributor ! UserCreatedMessage(
> >+          if (user != null) {user.id} else 0,
> >+          message._1,
> >+          Nil,
> >+          message._2,
> >+          Empty,
> >+          participant,
> >+          Empty,
> >+          None
> >+        )
> >+      )
> >+      messages = List.empty
> >+    }
> >+    case _ => logger.error("Incoming message is not Camel Message!")
> >+  }
> >+}
> >\ No newline at end of file
> >
> >Added:
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto
>
> >==============================================================================
> >---
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> (added)
> >+++
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> Mon Aug 20 11:16:34 2012
> >@@ -0,0 +1,66 @@
> >+package org.apache.esme.actor
> >+
> >+import akka.actor.{ActorRef, Actor, Props => AkkaProps}
> >+import net.liftweb.util.Props
> >+import org.apache.esme.actor.XmppReceiver.FetchMessages
> >+import org.apache.esme.model.User
> >+import net.liftweb.common.Logger
> >+
> >+/**
> >+ * Created with IntelliJ IDEA.
> >+ * User: lester
> >+ * Date: 17.08.12
> >+ * Time: 4:21
> >+ */
> >+
> >+object XmppSupervisor {
> >+  val logger: Logger = Logger("org.apache.esme.actor")
> >+
> >+
> >+  sealed trait XmppSupervisorActions
> >+  case class Fetch(id: Long) extends XmppSupervisorActions
> >+  case class Start(id: Long, who: String, usr: User) extends
> XmppSupervisorActions
> >+  case class Stop(id: Long) extends XmppSupervisorActions
> >+}
> >+
> >+class XmppSupervisor extends Actor {
> >+
> >+  import XmppSupervisor._
> >+
> >+  private var xmppPullActors: Map[Long, ActorRef] = Map.empty
> >+
> >+  var xmppHost: String = _
> >+  var xmppPort: String = _
> >+  var xmppUsr: String = _
> >+  var xmppPwd: String = _
> >+  var xmppServiceName: String = _
> >+
> >+
> >+  override def preStart() {
> >+    logger.info("preStart() called")
> >+
> >+    xmppHost = Props.get("xmpp.host") openOr ""
> >+    xmppPort = Props.get("xmpp.port") openOr ""
> >+    xmppUsr = Props.get("xmpp.user") openOr ""
> >+    xmppPwd = Props.get("xmpp.password") openOr ""
> >+    xmppServiceName = Props.get("xmpp.serviceName") openOr ""
> >+  }
> >+
> >+  def receive = {
> >+    case Start(id, who, usr) => {
> >+      logger.info("Start message received. User: %s, who:
> %s".format(usr, who))
> >+      xmppPullActors += (id -> context.actorOf(AkkaProps(new
> XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName,
> who, usr))))
> >+    }
> >+    case Stop(id) => {
> >+      xmppPullActors.get(id).foreach { ref =>
> >+        context.stop(ref)
> >+        xmppPullActors -= id
> >+      }
> >+    }
> >+    case Fetch(id) => {
> >+      logger.info("Fetch message received")
> >+      xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
> >+    }
> >+    case _ => logger.info("Unknown message received")
> >+  }
> >+}
> >
> >Modified:
> esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> (original)
> >+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> Mon Aug 20 11:16:34 2012
> >@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers(
> >   lazy val password: Parser[String] = user
> >
> >   lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr
> >-  lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
> >+  lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
> >+  lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr
> >
> >   lazy val emailAddr: Parser[String] = rep1(xchar) ^^ {
> >     case xs => xs.mkString
> >@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers(
> >   (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> >     case mt ~ text => MailTo(mt, text.map(_ mkString))
> >   }) |
> >-  (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> >+  (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> >     case mt ~ text => XmppTo(mt, text.map(_ mkString))
> >   }) |
> >+  (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> >+    case mt ~ text => XmppFrom(mt)
> >+  }) |
> >   (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ {
> >-      case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
> >-        HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
> >-    }) |
> >+    case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
> >+      HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
> >+  }) |
> >   (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ {
> >     case url ~ tags => FetchAtom(UrlStore.make(url), tags)
> >   }) |
> >
> >Modified:
> esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> (original)
> >+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> Mon Aug 20 11:16:34 2012
> >@@ -34,7 +34,15 @@ import java.util.Calendar
> > import java.util.Date
> > import scala.xml.{Text, Node, Elem => XmlElem}
> >
> >-object Action extends Action with LongKeyedMetaMapper[Action] {
> >+import akka.actor.{Props => AkkaProps, ActorSystem}
> >+
> >+object Action extends Action with LongKeyedMetaMapper[Action] with
> Logger {
> >+
> >+  val logger: Logger = Logger("org.apache.esme.model")
> >+  val sys = ActorSystem("camel")
> >+  val xmppSupervisor = sys.actorFor("XmppSupervisor")
> >+
> >+
> >   override def afterCommit = notifyDistributor _ :: super.afterCommit
> >
> >   private def notifyDistributor(in: Action) {
> >@@ -58,6 +66,7 @@ object Action extends Action with LongKe
> >     } else {
> >       SchedulerActor ! SchedulerActor.StopRegular(in.id)
> >       MessagePullActor ! MessagePullActor.StopPullActor(in.id)
> >+      xmppSupervisor ! XmppSupervisor.Stop(in.id)
> >     }
> >   }
> >
> >@@ -185,6 +194,9 @@ object Action extends Action with LongKe
> >  */
> > class Action extends LongKeyedMapper[Action] {
> >
> >+  import Action.xmppSupervisor
> >+  import Action.logger
> >+
> >   /**
> >    * Actors related to regularly executed actions are started here
> >    * This is done when the action is activated or at the start of the
> application
> >@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act
> >               case FetchRss(_, _) => new RssFeed(u, url.url,
> urlSourcePrefix + url.uniqueId, 0, tags)
> >             }
> >             MessagePullActor ! MessagePullActor.StartPullActor(id.is,
> lastMsg, feed)
> >-
> >+
> >+          case _ =>
> >+        }
> >+      }
> >+      case XmppFrom(who) => {
> >+        User.find(user) match {
> >+          case Full(u) =>
> >+            xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
> >           case _ =>
> >         }
> >       }
> >@@ -569,6 +588,7 @@ sealed trait Performances
> > case class MailTo(who: String, text: Option[String]) extends Performances
> > case class HttpTo(url: String, user: String, password: String, headers:
> List[(String, String)], data: Option[String]) extends Performances
> > case class XmppTo(who: String, text: Option[String]) extends Performances
> >+case class XmppFrom(who: String) extends Performances
> > case class FetchAtom(override val url: UrlStore, override val tags:
> List[String]) extends FetchFeed(url, tags)
> > case class FetchRss(override val url: UrlStore, override val tags:
> List[String]) extends FetchFeed(url, tags)
> > case object PerformResend extends Performances
> >
> >
>



-- 
Best Regards,
Vladimir Ivanov