You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@esme.apache.org by le...@apache.org on 2012/08/20 13:16:35 UTC
svn commit: r1374973 - in /esme/branches/akka: ./
src/main/scala/bootstrap/liftweb/ src/main/scala/org/apache/esme/actor/
src/main/scala/org/apache/esme/lib/ src/main/scala/org/apache/esme/model/
Author: lester
Date: Mon Aug 20 11:16:34 2012
New Revision: 1374973
URL: http://svn.apache.org/viewvc?rev=1374973&view=rev
Log:
In scope of ESME-360: Began work on XMPP Consumer component action.
Added:
esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
Modified:
esme/branches/akka/build.sbt
esme/branches/akka/pom.xml
esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
Modified: esme/branches/akka/build.sbt
URL: http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/build.sbt (original)
+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012
@@ -8,7 +8,7 @@ version := "1.4"
organization := "Apache Software Foundation"
-scalaVersion := "2.9.1"
+scalaVersion := "2.9.1"
//scalacOptions ++= Seq("-unchecked", "-deprecation")
scalacOptions ++= Seq("-deprecation")
@@ -28,7 +28,7 @@ libraryDependencies ++= {
val compassVersion = "2.1.1"
val luceneVersion = "2.4.0"
val scalazVersion = "6.0.4"
- val akkaVersion = "2.0.2"
+ val akkaVersion = "2.1-20120701-002745"
val eclipsejettyVersion = "7.3.1.v20110307"
val mortbayjettyVersion = "6.1.22"
val slf4jVersion = "1.6.4"
@@ -47,7 +47,7 @@ libraryDependencies ++= {
"net.liftweb" %% "lift-textile" % liftVersion % "compile->default",
"org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
"com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default",
- "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" % "compile->default",
+ "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % "compile->default",
"org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
"javax.servlet" % "servlet-api" % "2.5" % "provided->default",
"org.compass-project" % "compass" % compassVersion % "compile->default",
Modified: esme/branches/akka/pom.xml
URL: http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/pom.xml (original)
+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012
@@ -76,7 +76,7 @@
<lift.version>2.4</lift.version>
<scala.version>2.9.1</scala.version>
<scalaz.version>6.0.4</scalaz.version>
- <akka.version>2.1-SNAPSHOT</akka.version>
+ <akka.version>2.1-20120701-002745</akka.version>
<compass.version>2.1.1</compass.version>
<lucene.version>2.4.0</lucene.version>
<netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server>
@@ -91,17 +91,6 @@
<url>http://repo2.maven.org/maven2/</url>
</repository>
<repository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>http://scala-tools.org/repo-releases</url>
- </repository>
- <repository>
- <id>scala-tools.org.snapshots</id>
- <name>Scala-Tools Maven2 Repository for Snapshots</name>
- <url>http://scala-tools.org/repo-snapshots</url>
- <snapshots/>
- </repository>
- <repository>
<id>typesafe</id>
<name>Typesafe Repository Releases</name>
<url>http://repo.typesafe.com/typesafe/releases</url>
@@ -135,9 +124,9 @@
<pluginRepositories>
<pluginRepository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>http://scala-tools.org/repo-releases</url>
+ <id>typesafe</id>
+ <name>Typesafe Repository Releases</name>
+ <url>http://repo.typesafe.com/typesafe/releases</url>
</pluginRepository>
</pluginRepositories>
@@ -232,7 +221,7 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-camel</artifactId>
- <version>2.1-SNAPSHOT</version>
+ <version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala (original)
+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon Aug 20 11:16:34 2012
@@ -27,6 +27,8 @@ import net.liftweb.http.auth._
import net.liftweb.sitemap._
import net.liftweb.sitemap.Loc._
import Helpers._
+import akka.actor.{Props => AkkaProps, ActorSystem}
+
//import TimeHelpers.intToTimeSpanBuilder
//import net.liftweb.mapper.{DB, ConnectionManager, Schemifier, DefaultConnectionIdentifier, ConnectionIdentifier}
import java.sql.{Connection, DriverManager}
@@ -249,6 +251,8 @@ class Boot extends Loggable {
ConvDistributor.touch
// ScalaInterpreter.touch
+ val sys = ActorSystem("camel")
+ val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
// Initiating popular links and resent messages
val resentPeriod = Props.getLong("stats.resent.period", 1 week)
Modified: esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala (original)
+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala Mon Aug 20 11:16:34 2012
@@ -64,8 +64,8 @@ object UserActor {
val xmppUsr = Props.get("xmpp.user") openOr ""
val xmppPwd = Props.get("xmpp.password") openOr ""
val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
- lazy val sys = ActorSystem("camel")
- lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)))
+ val sys = ActorSystem("camel")
+ val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
}
@@ -289,8 +289,14 @@ class UserActor extends LiftActor {
Distributor !
Distributor.AddMessageToMailbox(id, msg, ResendReason(userId))
- case FetchFeed(_, _) =>
- MessagePullActor ! MessagePullActor.Fetch(td.performId)
+ case XmppFrom(_) => {
+ val sys = ActorSystem("camel")
+ sys.actorFor("XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
+ }
+
+
+ case FetchFeed(_, _) =>
+ MessagePullActor ! MessagePullActor.Fetch(td.performId)
case ScalaInterpret => logger.info("Scala interpreter is disabled!")
/*if (msg.source.is != "scala")
Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala (added)
+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala Mon Aug 20 11:16:34 2012
@@ -0,0 +1,51 @@
+package org.apache.esme.actor
+
+import akka.camel.{CamelMessage, Consumer}
+
+import net.liftweb.common.{Empty, Logger}
+import collection.immutable.Queue
+import org.apache.esme.actor.Distributor.UserCreatedMessage
+import org.apache.esme.model.User
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: lester
+ * Date: 17.08.12
+ * Time: 2:26
+ */
+
+object XmppReceiver {
+ val logger: Logger = Logger("org.apache.esme.actor")
+ case class FetchMessages()
+}
+
+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String, esmePwd: String, xmppServiceName: String, participant: String, user: User) extends Consumer {
+
+ import XmppReceiver._
+
+ var messages: List[(String, Long)] = List.empty
+
+ def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: %s".format(uri)); uri}
+
+ def receive = {
+ case msg: CamelMessage => {
+ messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages
+ }
+ case FetchMessages => {
+ messages.foreach(message =>
+ Distributor ! UserCreatedMessage(
+ if (user != null) {user.id} else 0,
+ message._1,
+ Nil,
+ message._2,
+ Empty,
+ participant,
+ Empty,
+ None
+ )
+ )
+ messages = List.empty
+ }
+ case _ => logger.error("Incoming message is not Camel Message!")
+ }
+}
\ No newline at end of file
Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala (added)
+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala Mon Aug 20 11:16:34 2012
@@ -0,0 +1,66 @@
+package org.apache.esme.actor
+
+import akka.actor.{ActorRef, Actor, Props => AkkaProps}
+import net.liftweb.util.Props
+import org.apache.esme.actor.XmppReceiver.FetchMessages
+import org.apache.esme.model.User
+import net.liftweb.common.Logger
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: lester
+ * Date: 17.08.12
+ * Time: 4:21
+ */
+
+object XmppSupervisor {
+ val logger: Logger = Logger("org.apache.esme.actor")
+
+
+ sealed trait XmppSupervisorActions
+ case class Fetch(id: Long) extends XmppSupervisorActions
+ case class Start(id: Long, who: String, usr: User) extends XmppSupervisorActions
+ case class Stop(id: Long) extends XmppSupervisorActions
+}
+
+class XmppSupervisor extends Actor {
+
+ import XmppSupervisor._
+
+ private var xmppPullActors: Map[Long, ActorRef] = Map.empty
+
+ var xmppHost: String = _
+ var xmppPort: String = _
+ var xmppUsr: String = _
+ var xmppPwd: String = _
+ var xmppServiceName: String = _
+
+
+ override def preStart() {
+ logger.info("preStart() called")
+
+ xmppHost = Props.get("xmpp.host") openOr ""
+ xmppPort = Props.get("xmpp.port") openOr ""
+ xmppUsr = Props.get("xmpp.user") openOr ""
+ xmppPwd = Props.get("xmpp.password") openOr ""
+ xmppServiceName = Props.get("xmpp.serviceName") openOr ""
+ }
+
+ def receive = {
+ case Start(id, who, usr) => {
+ logger.info("Start message received. User: %s, who: %s".format(usr, who))
+ xmppPullActors += (id -> context.actorOf(AkkaProps(new XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, usr))))
+ }
+ case Stop(id) => {
+ xmppPullActors.get(id).foreach { ref =>
+ context.stop(ref)
+ xmppPullActors -= id
+ }
+ }
+ case Fetch(id) => {
+ logger.info("Fetch message received")
+ xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
+ }
+ case _ => logger.info("Unknown message received")
+ }
+}
Modified: esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala (original)
+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala Mon Aug 20 11:16:34 2012
@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers(
lazy val password: Parser[String] = user
lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr
- lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
+ lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
+ lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr
lazy val emailAddr: Parser[String] = rep1(xchar) ^^ {
case xs => xs.mkString
@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers(
(mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
case mt ~ text => MailTo(mt, text.map(_ mkString))
}) |
- (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
+ (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
case mt ~ text => XmppTo(mt, text.map(_ mkString))
}) |
+ (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
+ case mt ~ text => XmppFrom(mt)
+ }) |
(scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ {
- case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
- HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
- }) |
+ case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
+ HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
+ }) |
(acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ {
case url ~ tags => FetchAtom(UrlStore.make(url), tags)
}) |
Modified: esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala (original)
+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala Mon Aug 20 11:16:34 2012
@@ -34,7 +34,15 @@ import java.util.Calendar
import java.util.Date
import scala.xml.{Text, Node, Elem => XmlElem}
-object Action extends Action with LongKeyedMetaMapper[Action] {
+import akka.actor.{Props => AkkaProps, ActorSystem}
+
+object Action extends Action with LongKeyedMetaMapper[Action] with Logger {
+
+ val logger: Logger = Logger("org.apache.esme.model")
+ val sys = ActorSystem("camel")
+ val xmppSupervisor = sys.actorFor("XmppSupervisor")
+
+
override def afterCommit = notifyDistributor _ :: super.afterCommit
private def notifyDistributor(in: Action) {
@@ -58,6 +66,7 @@ object Action extends Action with LongKe
} else {
SchedulerActor ! SchedulerActor.StopRegular(in.id)
MessagePullActor ! MessagePullActor.StopPullActor(in.id)
+ xmppSupervisor ! XmppSupervisor.Stop(in.id)
}
}
@@ -185,6 +194,9 @@ object Action extends Action with LongKe
*/
class Action extends LongKeyedMapper[Action] {
+ import Action.xmppSupervisor
+ import Action.logger
+
/**
* Actors related to regularly executed actions are started here
* This is done when the action is activated or at the start of the application
@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act
case FetchRss(_, _) => new RssFeed(u, url.url, urlSourcePrefix + url.uniqueId, 0, tags)
}
MessagePullActor ! MessagePullActor.StartPullActor(id.is, lastMsg, feed)
-
+
+ case _ =>
+ }
+ }
+ case XmppFrom(who) => {
+ User.find(user) match {
+ case Full(u) =>
+ xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
case _ =>
}
}
@@ -569,6 +588,7 @@ sealed trait Performances
case class MailTo(who: String, text: Option[String]) extends Performances
case class HttpTo(url: String, user: String, password: String, headers: List[(String, String)], data: Option[String]) extends Performances
case class XmppTo(who: String, text: Option[String]) extends Performances
+case class XmppFrom(who: String) extends Performances
case class FetchAtom(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
case class FetchRss(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
case object PerformResend extends Performances