You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:43 UTC
[12/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
new file mode 100644
index 0000000..7174ec8
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -0,0 +1,640 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import akka.event.Logging
+import sun.misc.BASE64Decoder
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.io.IO
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.predictionio.data.Utils
+import org.apache.predictionio.data.storage.AccessKeys
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.DateTimeJson4sSupport
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventJson4sSupport
+import org.apache.predictionio.data.storage.BatchEventsJson4sSupport
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.Storage
+import org.json4s.DefaultFormats
+import org.json4s.Formats
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+import spray.can.Http
+import spray.http.FormData
+import spray.http.MediaTypes
+import spray.http.StatusCodes
+import spray.httpx.Json4sSupport
+import spray.routing._
+import spray.routing.authentication.Authentication
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.util.{Try, Success, Failure}
+
+class EventServiceActor(
+ val eventClient: LEvents,
+ val accessKeysClient: AccessKeys,
+ val channelsClient: Channels,
+ val config: EventServerConfig) extends HttpServiceActor {
+
+ object Json4sProtocol extends Json4sSupport {
+ implicit def json4sFormats: Formats = DefaultFormats +
+ new EventJson4sSupport.APISerializer +
+ new BatchEventsJson4sSupport.APISerializer +
+ // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
+ // some format not converted, or timezone not correct
+ new DateTimeJson4sSupport.Serializer
+ }
+
+
+ val MaxNumberOfEventsPerBatchRequest = 50
+
+ val logger = Logging(context.system, this)
+
+ // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
+ // Futures
+ implicit def executionContext: ExecutionContext = context.dispatcher
+
+ implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+
+ val rejectionHandler = Common.rejectionHandler
+
+ val jsonPath = """(.+)\.json$""".r
+ val formPath = """(.+)\.form$""".r
+
+ val pluginContext = EventServerPluginContext(logger)
+
+ private lazy val base64Decoder = new BASE64Decoder
+
+ case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
+
+ /* with accessKey in query/header, return appId if succeed */
+ def withAccessKey: RequestContext => Future[Authentication[AuthData]] = {
+ ctx: RequestContext =>
+ val accessKeyParamOpt = ctx.request.uri.query.get("accessKey")
+ val channelParamOpt = ctx.request.uri.query.get("channel")
+ Future {
+ // with accessKey in query, return appId if succeed
+ accessKeyParamOpt.map { accessKeyParam =>
+ accessKeysClient.get(accessKeyParam).map { k =>
+ channelParamOpt.map { ch =>
+ val channelMap =
+ channelsClient.getByAppid(k.appid)
+ .map(c => (c.name, c.id)).toMap
+ if (channelMap.contains(ch)) {
+ Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
+ } else {
+ Left(ChannelRejection(s"Invalid channel '$ch'."))
+ }
+ }.getOrElse{
+ Right(AuthData(k.appid, None, k.events))
+ }
+ }.getOrElse(FailedAuth)
+ }.getOrElse {
+ // with accessKey in header, return appId if succeed
+ ctx.request.headers.find(_.name == "Authorization").map { authHeader \u21d2
+ authHeader.value.split("Basic ") match {
+ case Array(_, value) \u21d2
+ val appAccessKey =
+ new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
+ accessKeysClient.get(appAccessKey) match {
+ case Some(k) \u21d2 Right(AuthData(k.appid, None, k.events))
+ case None \u21d2 FailedAuth
+ }
+
+ case _ \u21d2 FailedAuth
+ }
+ }.getOrElse(MissedAuth)
+ }
+ }
+ }
+
+ private val FailedAuth = Left(
+ AuthenticationFailedRejection(
+ AuthenticationFailedRejection.CredentialsRejected, List()
+ )
+ )
+
+ private val MissedAuth = Left(
+ AuthenticationFailedRejection(
+ AuthenticationFailedRejection.CredentialsMissing, List()
+ )
+ )
+
+ lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor")
+ lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor")
+
+ val route: Route =
+ pathSingleSlash {
+ import Json4sProtocol._
+
+ get {
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete(Map("status" -> "alive"))
+ }
+ }
+ } ~
+ path("plugins.json") {
+ import Json4sProtocol._
+ get {
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete {
+ Map("plugins" -> Map(
+ "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) =>
+ n -> Map(
+ "name" -> p.pluginName,
+ "description" -> p.pluginDescription,
+ "class" -> p.getClass.getName)
+ },
+ "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) =>
+ n -> Map(
+ "name" -> p.pluginName,
+ "description" -> p.pluginDescription,
+ "class" -> p.getClass.getName)
+ }
+ ))
+ }
+ }
+ }
+ } ~
+ path("plugins" / Segments) { segments =>
+ get {
+ handleExceptions(Common.exceptionHandler) {
+ authenticate(withAccessKey) { authData =>
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete {
+ val pluginArgs = segments.drop(2)
+ val pluginType = segments(0)
+ val pluginName = segments(1)
+ pluginType match {
+ case EventServerPlugin.inputBlocker =>
+ pluginContext.inputBlockers(pluginName).handleREST(
+ authData.appId,
+ authData.channelId,
+ pluginArgs)
+ case EventServerPlugin.inputSniffer =>
+ pluginsActorRef ? PluginsActor.HandleREST(
+ appId = authData.appId,
+ channelId = authData.channelId,
+ pluginName = pluginName,
+ pluginArgs = pluginArgs) map {
+ _.asInstanceOf[String]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ path("events" / jsonPath ) { eventId =>
+
+ import Json4sProtocol._
+
+ get {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete {
+ logger.debug(s"GET event ${eventId}.")
+ val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt =>
+ eventOpt.map( event =>
+ (StatusCodes.OK, event)
+ ).getOrElse(
+ (StatusCodes.NotFound, Map("message" -> "Not Found"))
+ )
+ }
+ data
+ }
+ }
+ }
+ }
+ }
+ } ~
+ delete {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete {
+ logger.debug(s"DELETE event ${eventId}.")
+ val data = eventClient.futureDelete(eventId, appId, channelId).map { found =>
+ if (found) {
+ (StatusCodes.OK, Map("message" -> "Found"))
+ } else {
+ (StatusCodes.NotFound, Map("message" -> "Not Found"))
+ }
+ }
+ data
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ path("events.json") {
+
+ import Json4sProtocol._
+
+ post {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ val events = authData.events
+ entity(as[Event]) { event =>
+ complete {
+ if (events.isEmpty || authData.events.contains(event.event)) {
+ pluginContext.inputBlockers.values.foreach(
+ _.process(EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event), pluginContext))
+ val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+ pluginsActorRef ! EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event)
+ val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+ if (config.stats) {
+ statsActorRef ! Bookkeeping(appId, result._1, event)
+ }
+ result
+ }
+ data
+ } else {
+ (StatusCodes.Forbidden,
+ Map("message" -> s"${event.event} events are not allowed"))
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ get {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ parameters(
+ 'startTime.as[Option[String]],
+ 'untilTime.as[Option[String]],
+ 'entityType.as[Option[String]],
+ 'entityId.as[Option[String]],
+ 'event.as[Option[String]],
+ 'targetEntityType.as[Option[String]],
+ 'targetEntityId.as[Option[String]],
+ 'limit.as[Option[Int]],
+ 'reversed.as[Option[Boolean]]) {
+ (startTimeStr, untilTimeStr, entityType, entityId,
+ eventName, // only support one event name
+ targetEntityType, targetEntityId,
+ limit, reversed) =>
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete {
+ logger.debug(
+ s"GET events of appId=${appId} " +
+ s"st=${startTimeStr} ut=${untilTimeStr} " +
+ s"et=${entityType} eid=${entityId} " +
+ s"li=${limit} rev=${reversed} ")
+
+ require(!((reversed == Some(true))
+ && (entityType.isEmpty || entityId.isEmpty)),
+ "the parameter reversed can only be used with" +
+ " both entityType and entityId specified.")
+
+ val parseTime = Future {
+ val startTime = startTimeStr.map(Utils.stringToDateTime(_))
+ val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
+ (startTime, untilTime)
+ }
+
+
+ parseTime.flatMap { case (startTime, untilTime) =>
+ val data = eventClient.futureFind(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = eventName.map(List(_)),
+ targetEntityType = targetEntityType.map(Some(_)),
+ targetEntityId = targetEntityId.map(Some(_)),
+ limit = limit.orElse(Some(20)),
+ reversed = reversed)
+ .map { eventIter =>
+ if (eventIter.hasNext) {
+ (StatusCodes.OK, eventIter.toArray)
+ } else {
+ (StatusCodes.NotFound,
+ Map("message" -> "Not Found"))
+ }
+ }
+ data
+ }.recover {
+ case e: Exception =>
+ (StatusCodes.BadRequest, Map("message" -> s"${e}"))
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ path("batch" / "events.json") {
+
+ import Json4sProtocol._
+
+ post {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ val allowedEvents = authData.events
+ val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
+ case Success(event) => {
+ if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
+ pluginContext.inputBlockers.values.foreach(
+ _.process(EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event), pluginContext))
+ val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+ pluginsActorRef ! EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event)
+ val status = StatusCodes.Created
+ val result = Map(
+ "status" -> status.intValue,
+ "eventId" -> s"${id}")
+ if (config.stats) {
+ statsActorRef ! Bookkeeping(appId, status, event)
+ }
+ result
+ }.recover { case exception =>
+ Map(
+ "status" -> StatusCodes.InternalServerError.intValue,
+ "message" -> s"${exception.getMessage()}")
+ }
+ data
+ } else {
+ Future.successful(Map(
+ "status" -> StatusCodes.Forbidden.intValue,
+ "message" -> s"${event.event} events are not allowed"))
+ }
+ }
+ case Failure(exception) => {
+ Future.successful(Map(
+ "status" -> StatusCodes.BadRequest.intValue,
+ "message" -> s"${exception.getMessage()}"))
+ }
+ }
+
+ entity(as[Seq[Try[Event]]]) { events =>
+ complete {
+ if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+ Future.traverse(events)(handleEvent)
+ } else {
+ (StatusCodes.BadRequest,
+ Map("message" -> (s"Batch request must have less than or equal to " +
+ s"${MaxNumberOfEventsPerBatchRequest} events")))
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ path("stats.json") {
+
+ import Json4sProtocol._
+
+ get {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ respondWithMediaType(MediaTypes.`application/json`) {
+ if (config.stats) {
+ complete {
+ statsActorRef ? GetStats(appId) map {
+ _.asInstanceOf[Map[String, StatsSnapshot]]
+ }
+ }
+ } else {
+ complete(
+ StatusCodes.NotFound,
+ parse("""{"message": "To see stats, launch Event Server """ +
+ """with --stats argument."}"""))
+ }
+ }
+ }
+ }
+ }
+ } // stats.json get
+ } ~
+ path("webhooks" / jsonPath ) { web =>
+ import Json4sProtocol._
+
+ post {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ respondWithMediaType(MediaTypes.`application/json`) {
+ entity(as[JObject]) { jObj =>
+ complete {
+ Webhooks.postJson(
+ appId = appId,
+ channelId = channelId,
+ web = web,
+ data = jObj,
+ eventClient = eventClient,
+ log = logger,
+ stats = config.stats,
+ statsActorRef = statsActorRef)
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ get {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete {
+ Webhooks.getJson(
+ appId = appId,
+ channelId = channelId,
+ web = web,
+ log = logger)
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ path("webhooks" / formPath ) { web =>
+ post {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ respondWithMediaType(MediaTypes.`application/json`) {
+ entity(as[FormData]){ formData =>
+ // logger.debug(formData.toString)
+ complete {
+ // respond with JSON
+ import Json4sProtocol._
+
+ Webhooks.postForm(
+ appId = appId,
+ channelId = channelId,
+ web = web,
+ data = formData,
+ eventClient = eventClient,
+ log = logger,
+ stats = config.stats,
+ statsActorRef = statsActorRef)
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
+ get {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete {
+ // respond with JSON
+ import Json4sProtocol._
+
+ Webhooks.getForm(
+ appId = appId,
+ channelId = channelId,
+ web = web,
+ log = logger)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ def receive: Actor.Receive = runRoute(route)
+}
+
+
+
+/* message */
+case class StartServer(host: String, port: Int)
+
+class EventServerActor(
+ val eventClient: LEvents,
+ val accessKeysClient: AccessKeys,
+ val channelsClient: Channels,
+ val config: EventServerConfig) extends Actor with ActorLogging {
+ val child = context.actorOf(
+ Props(classOf[EventServiceActor],
+ eventClient,
+ accessKeysClient,
+ channelsClient,
+ config),
+ "EventServiceActor")
+ implicit val system = context.system
+
+ def receive: Actor.Receive = {
+ case StartServer(host, portNum) => {
+ IO(Http) ! Http.Bind(child, interface = host, port = portNum)
+ }
+ case m: Http.Bound => log.info("Bound received. EventServer is ready.")
+ case m: Http.CommandFailed => log.error("Command failed.")
+ case _ => log.error("Unknown message.")
+ }
+}
+
+case class EventServerConfig(
+ ip: String = "localhost",
+ port: Int = 7070,
+ plugins: String = "plugins",
+ stats: Boolean = false)
+
+object EventServer {
+ def createEventServer(config: EventServerConfig): Unit = {
+ implicit val system = ActorSystem("EventServerSystem")
+
+ val eventClient = Storage.getLEvents()
+ val accessKeysClient = Storage.getMetaDataAccessKeys()
+ val channelsClient = Storage.getMetaDataChannels()
+
+ val serverActor = system.actorOf(
+ Props(
+ classOf[EventServerActor],
+ eventClient,
+ accessKeysClient,
+ channelsClient,
+ config),
+ "EventServerActor"
+ )
+ if (config.stats) system.actorOf(Props[StatsActor], "StatsActor")
+ system.actorOf(Props[PluginsActor], "PluginsActor")
+ serverActor ! StartServer(config.ip, config.port)
+ system.awaitTermination()
+ }
+}
+
+object Run {
+ def main(args: Array[String]) {
+ EventServer.createEventServer(EventServerConfig(
+ ip = "0.0.0.0",
+ port = 7070))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala
new file mode 100644
index 0000000..c4918c2
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala
@@ -0,0 +1,33 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+trait EventServerPlugin {
+ val pluginName: String
+ val pluginDescription: String
+ val pluginType: String
+
+ def start(context: EventServerPluginContext): Unit
+
+ def process(eventInfo: EventInfo, context: EventServerPluginContext)
+
+ def handleREST(appId: Int, channelId: Option[Int], arguments: Seq[String]): String
+}
+
+object EventServerPlugin {
+ val inputBlocker = "inputblocker"
+ val inputSniffer = "inputsniffer"
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
new file mode 100644
index 0000000..db5743b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
@@ -0,0 +1,49 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import java.util.ServiceLoader
+
+import akka.event.LoggingAdapter
+import grizzled.slf4j.Logging
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+class EventServerPluginContext(
+ val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]],
+ val log: LoggingAdapter) {
+ def inputBlockers: Map[String, EventServerPlugin] =
+ plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap
+
+ def inputSniffers: Map[String, EventServerPlugin] =
+ plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap
+}
+
+object EventServerPluginContext extends Logging {
+ def apply(log: LoggingAdapter): EventServerPluginContext = {
+ val plugins = mutable.Map[String, mutable.Map[String, EventServerPlugin]](
+ EventServerPlugin.inputBlocker -> mutable.Map(),
+ EventServerPlugin.inputSniffer -> mutable.Map())
+ val serviceLoader = ServiceLoader.load(classOf[EventServerPlugin])
+ serviceLoader foreach { service =>
+ plugins(service.pluginType) += service.pluginName -> service
+ }
+ new EventServerPluginContext(
+ plugins,
+ log)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala
new file mode 100644
index 0000000..e6c1ae8
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import akka.actor.Actor
+import akka.event.Logging
+
+class PluginsActor() extends Actor {
+ implicit val system = context.system
+ val log = Logging(system, this)
+
+ val pluginContext = EventServerPluginContext(log)
+
+ def receive: PartialFunction[Any, Unit] = {
+ case e: EventInfo =>
+ pluginContext.inputSniffers.values.foreach(_.process(e, pluginContext))
+ case h: PluginsActor.HandleREST =>
+ try {
+ sender() ! pluginContext.inputSniffers(h.pluginName).handleREST(
+ h.appId,
+ h.channelId,
+ h.pluginArgs)
+ } catch {
+ case e: Exception =>
+ sender() ! s"""{"message":"${e.getMessage}"}"""
+ }
+ case _ =>
+ log.error("Unknown message sent to Event Server input sniffer plugin host.")
+ }
+}
+
+object PluginsActor {
+ case class HandleREST(
+ pluginName: String,
+ appId: Int,
+ channelId: Option[Int],
+ pluginArgs: Seq[String])
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
new file mode 100644
index 0000000..231d101
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
@@ -0,0 +1,79 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Event
+
+import spray.http.StatusCode
+
+import scala.collection.mutable.{ HashMap => MHashMap }
+import scala.collection.mutable
+
+import com.github.nscala_time.time.Imports.DateTime
+
+case class EntityTypesEvent(
+ val entityType: String,
+ val targetEntityType: Option[String],
+ val event: String) {
+
+ def this(e: Event) = this(
+ e.entityType,
+ e.targetEntityType,
+ e.event)
+}
+
+case class KV[K, V](key: K, value: V)
+
+case class StatsSnapshot(
+ val startTime: DateTime,
+ val endTime: Option[DateTime],
+ val basic: Seq[KV[EntityTypesEvent, Long]],
+ val statusCode: Seq[KV[StatusCode, Long]]
+)
+
+
+class Stats(val startTime: DateTime) {
+ private[this] var _endTime: Option[DateTime] = None
+ var statusCodeCount = MHashMap[(Int, StatusCode), Long]().withDefaultValue(0L)
+ var eteCount = MHashMap[(Int, EntityTypesEvent), Long]().withDefaultValue(0L)
+
+ def cutoff(endTime: DateTime) {
+ _endTime = Some(endTime)
+ }
+
+ def update(appId: Int, statusCode: StatusCode, event: Event) {
+ statusCodeCount((appId, statusCode)) += 1
+ eteCount((appId, new EntityTypesEvent(event))) += 1
+ }
+
+ def extractByAppId[K, V](appId: Int, m: mutable.Map[(Int, K), V])
+ : Seq[KV[K, V]] = {
+ m
+ .toSeq
+ .flatMap { case (k, v) =>
+ if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() }
+ }
+ }
+
+ def get(appId: Int): StatsSnapshot = {
+ StatsSnapshot(
+ startTime,
+ _endTime,
+ extractByAppId(appId, eteCount),
+ extractByAppId(appId, statusCodeCount)
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
new file mode 100644
index 0000000..a8ed3e7
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
@@ -0,0 +1,74 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Event
+
+import spray.http.StatusCode
+
+import akka.actor.Actor
+import akka.event.Logging
+
+import com.github.nscala_time.time.Imports.DateTime
+
+/* message to StatsActor */
+case class Bookkeeping(val appId: Int, statusCode: StatusCode, event: Event)
+
+/* message to StatsActor */
+case class GetStats(val appId: Int)
+
+class StatsActor extends Actor {
+ implicit val system = context.system
+ val log = Logging(system, this)
+
+ def getCurrent: DateTime = {
+ DateTime.now.
+ withMinuteOfHour(0).
+ withSecondOfMinute(0).
+ withMillisOfSecond(0)
+ }
+
+ var longLiveStats = new Stats(DateTime.now)
+ var hourlyStats = new Stats(getCurrent)
+
+ var prevHourlyStats = new Stats(getCurrent.minusHours(1))
+ prevHourlyStats.cutoff(hourlyStats.startTime)
+
+ def bookkeeping(appId: Int, statusCode: StatusCode, event: Event) {
+ val current = getCurrent
+ // If the current hour is different from the stats start time, we create
+ // another stats instance, and move the current to prev.
+ if (current != hourlyStats.startTime) {
+ prevHourlyStats = hourlyStats
+ prevHourlyStats.cutoff(current)
+ hourlyStats = new Stats(current)
+ }
+
+ hourlyStats.update(appId, statusCode, event)
+ longLiveStats.update(appId, statusCode, event)
+ }
+
+ def receive: Actor.Receive = {
+ case Bookkeeping(appId, statusCode, event) =>
+ bookkeeping(appId, statusCode, event)
+ case GetStats(appId) => sender() ! Map(
+ "time" -> DateTime.now,
+ "currentHour" -> hourlyStats.get(appId),
+ "prevHour" -> prevHourlyStats.get(appId),
+ "longLive" -> longLiveStats.get(appId))
+ case _ => log.error("Unknown message.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
new file mode 100644
index 0000000..04ff78f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
@@ -0,0 +1,151 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.webhooks.JsonConnector
+import org.apache.predictionio.data.webhooks.FormConnector
+import org.apache.predictionio.data.webhooks.ConnectorUtil
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventJson4sSupport
+import org.apache.predictionio.data.storage.LEvents
+
+import spray.routing._
+import spray.routing.Directives._
+import spray.http.StatusCodes
+import spray.http.StatusCode
+import spray.http.FormData
+import spray.httpx.Json4sSupport
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+
+import akka.event.LoggingAdapter
+import akka.actor.ActorSelection
+
+import scala.concurrent.{ExecutionContext, Future}
+
+
+private[prediction] object Webhooks {
+
+ def postJson(
+ appId: Int,
+ channelId: Option[Int],
+ web: String,
+ data: JObject,
+ eventClient: LEvents,
+ log: LoggingAdapter,
+ stats: Boolean,
+ statsActorRef: ActorSelection
+ )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+
+ val eventFuture = Future {
+ WebhooksConnectors.json.get(web).map { connector =>
+ ConnectorUtil.toEvent(connector, data)
+ }
+ }
+
+ eventFuture.flatMap { eventOpt =>
+ if (eventOpt.isEmpty) {
+ Future successful {
+ val message = s"webhooks connection for ${web} is not supported."
+ (StatusCodes.NotFound, Map("message" -> message))
+ }
+ } else {
+ val event = eventOpt.get
+ val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+ val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+
+ if (stats) {
+ statsActorRef ! Bookkeeping(appId, result._1, event)
+ }
+ result
+ }
+ data
+ }
+ }
+ }
+
+ def getJson(
+ appId: Int,
+ channelId: Option[Int],
+ web: String,
+ log: LoggingAdapter
+ )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+ Future {
+ WebhooksConnectors.json.get(web).map { connector =>
+ (StatusCodes.OK, Map("message" -> "Ok"))
+ }.getOrElse {
+ val message = s"webhooks connection for ${web} is not supported."
+ (StatusCodes.NotFound, Map("message" -> message))
+ }
+ }
+ }
+
+ def postForm(
+ appId: Int,
+ channelId: Option[Int],
+ web: String,
+ data: FormData,
+ eventClient: LEvents,
+ log: LoggingAdapter,
+ stats: Boolean,
+ statsActorRef: ActorSelection
+ )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+ val eventFuture = Future {
+ WebhooksConnectors.form.get(web).map { connector =>
+ ConnectorUtil.toEvent(connector, data.fields.toMap)
+ }
+ }
+
+ eventFuture.flatMap { eventOpt =>
+ if (eventOpt.isEmpty) {
+ Future {
+ val message = s"webhooks connection for ${web} is not supported."
+ (StatusCodes.NotFound, Map("message" -> message))
+ }
+ } else {
+ val event = eventOpt.get
+ val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+ val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+
+ if (stats) {
+ statsActorRef ! Bookkeeping(appId, result._1, event)
+ }
+ result
+ }
+ data
+ }
+ }
+ }
+
+ def getForm(
+ appId: Int,
+ channelId: Option[Int],
+ web: String,
+ log: LoggingAdapter
+ )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+ Future {
+ WebhooksConnectors.form.get(web).map { connector =>
+ (StatusCodes.OK, Map("message" -> "Ok"))
+ }.getOrElse {
+ val message = s"webhooks connection for ${web} is not supported."
+ (StatusCodes.NotFound, Map("message" -> message))
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala b/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala
new file mode 100644
index 0000000..c2578ee
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala
@@ -0,0 +1,34 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.webhooks.JsonConnector
+import org.apache.predictionio.data.webhooks.FormConnector
+
+import org.apache.predictionio.data.webhooks.segmentio.SegmentIOConnector
+import org.apache.predictionio.data.webhooks.mailchimp.MailChimpConnector
+
+private[prediction] object WebhooksConnectors {
+
+ val json: Map[String, JsonConnector] = Map(
+ "segmentio" -> SegmentIOConnector
+ )
+
+ val form: Map[String, FormConnector] = Map(
+ "mailchimp" -> MailChimpConnector
+ )
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/package.scala b/data/src/main/scala/org/apache/predictionio/data/package.scala
new file mode 100644
index 0000000..9284787
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/package.scala
@@ -0,0 +1,21 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio
+
+/** Provides data access for PredictionIO and any engines running on top of
+ * PredictionIO
+ */
+package object data {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala
new file mode 100644
index 0000000..3285de9
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala
@@ -0,0 +1,71 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import java.security.SecureRandom
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.commons.codec.binary.Base64
+
+/** :: DeveloperApi ::
+ * Stores mapping of access keys, app IDs, and lists of allowed event names
+ *
+ * @param key Access key
+ * @param appid App ID
+ * @param events List of allowed events for this particular app key
+ * @group Meta Data
+ */
+@DeveloperApi
+case class AccessKey(
+ key: String,
+ appid: Int,
+ events: Seq[String])
+
+/** :: DeveloperApi ::
+ * Base trait of the [[AccessKey]] data access object
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+trait AccessKeys {
+ /** Insert a new [[AccessKey]]. If the key field is empty, a key will be
+ * generated.
+ */
+ def insert(k: AccessKey): Option[String]
+
+ /** Get an [[AccessKey]] by key */
+ def get(k: String): Option[AccessKey]
+
+ /** Get all [[AccessKey]]s */
+ def getAll(): Seq[AccessKey]
+
+ /** Get all [[AccessKey]]s for a particular app ID */
+ def getByAppid(appid: Int): Seq[AccessKey]
+
+ /** Update an [[AccessKey]] */
+ def update(k: AccessKey): Unit
+
+ /** Delete an [[AccessKey]] */
+ def delete(k: String): Unit
+
+ /** Default implementation of key generation */
+ def generateKey: String = {
+ val sr = SecureRandom.getInstanceStrong
+ val srBytes = Array.fill(48)(0.toByte)
+ sr.nextBytes(srBytes)
+ Base64.encodeBase64URLSafeString(srBytes)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala
new file mode 100644
index 0000000..b68e1b6
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala
@@ -0,0 +1,58 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+
+/** :: DeveloperApi ::
+ * Stores mapping of app IDs and names
+ *
+ * @param id ID of the app.
+ * @param name Name of the app.
+ * @param description Long description of the app.
+ * @group Meta Data
+ */
+@DeveloperApi
+case class App(
+ id: Int,
+ name: String,
+ description: Option[String])
+
+/** :: DeveloperApi ::
+ * Base trait of the [[App]] data access object
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+trait Apps {
+ /** Insert a new [[App]]. Returns a generated app ID if the supplied app ID is 0. */
+ def insert(app: App): Option[Int]
+
+ /** Get an [[App]] by app ID */
+ def get(id: Int): Option[App]
+
+ /** Get an [[App]] by app name */
+ def getByName(name: String): Option[App]
+
+ /** Get all [[App]]s */
+ def getAll(): Seq[App]
+
+ /** Update an [[App]] */
+ def update(app: App): Unit
+
+ /** Delete an [[App]] */
+ def delete(id: Int): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala
new file mode 100644
index 0000000..ad845b3
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala
@@ -0,0 +1,164 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import scala.collection.immutable.HashMap
+
+import org.apache.spark.rdd.RDD
+
+/** Immutable Bi-directional Map
+ *
+ */
+class BiMap[K, V] private[prediction] (
+ private val m: Map[K, V],
+ private val i: Option[BiMap[V, K]] = None
+ ) extends Serializable {
+
+ // NOTE: make inverse's inverse point back to current BiMap
+ val inverse: BiMap[V, K] = i.getOrElse {
+ val rev = m.map(_.swap)
+ require((rev.size == m.size),
+ s"Failed to create reversed map. Cannot have duplicated values.")
+ new BiMap(rev, Some(this))
+ }
+
+ def get(k: K): Option[V] = m.get(k)
+
+ def getOrElse(k: K, default: => V): V = m.getOrElse(k, default)
+
+ def contains(k: K): Boolean = m.contains(k)
+
+ def apply(k: K): V = m.apply(k)
+
+ /** Converts to a map.
+ * @return a map of type immutable.Map[K, V]
+ */
+ def toMap: Map[K, V] = m
+
+ /** Converts to a sequence.
+ * @return a sequence containing all elements of this map
+ */
+ def toSeq: Seq[(K, V)] = m.toSeq
+
+ def size: Int = m.size
+
+ def take(n: Int): BiMap[K, V] = BiMap(m.take(n))
+
+ override def toString: String = m.toString
+}
+
+object BiMap {
+
+ def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x)
+
+ /** Create a BiMap[String, Long] from a set of String. The Long index starts
+ * from 0.
+ * @param keys a set of String
+ * @return a String to Long BiMap
+ */
+ def stringLong(keys: Set[String]): BiMap[String, Long] = {
+ val hm = HashMap(keys.toSeq.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
+ new BiMap(hm)
+ }
+
+ /** Create a BiMap[String, Long] from an array of String.
+ * NOTE: the the array cannot have duplicated element.
+ * The Long index starts from 0.
+ * @param keys a set of String
+ * @return a String to Long BiMap
+ */
+ def stringLong(keys: Array[String]): BiMap[String, Long] = {
+ val hm = HashMap(keys.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
+ new BiMap(hm)
+ }
+
+ /** Create a BiMap[String, Long] from RDD[String]. The Long index starts
+ * from 0.
+ * @param keys RDD of String
+ * @return a String to Long BiMap
+ */
+ def stringLong(keys: RDD[String]): BiMap[String, Long] = {
+ stringLong(keys.distinct.collect)
+ }
+
+ /** Create a BiMap[String, Int] from a set of String. The Int index starts
+ * from 0.
+ * @param keys a set of String
+ * @return a String to Int BiMap
+ */
+ def stringInt(keys: Set[String]): BiMap[String, Int] = {
+ val hm = HashMap(keys.toSeq.zipWithIndex : _*)
+ new BiMap(hm)
+ }
+
+ /** Create a BiMap[String, Int] from an array of String.
+ * NOTE: the the array cannot have duplicated element.
+ * The Int index starts from 0.
+ * @param keys a set of String
+ * @return a String to Int BiMap
+ */
+ def stringInt(keys: Array[String]): BiMap[String, Int] = {
+ val hm = HashMap(keys.zipWithIndex : _*)
+ new BiMap(hm)
+ }
+
+ /** Create a BiMap[String, Int] from RDD[String]. The Int index starts
+ * from 0.
+ * @param keys RDD of String
+ * @return a String to Int BiMap
+ */
+ def stringInt(keys: RDD[String]): BiMap[String, Int] = {
+ stringInt(keys.distinct.collect)
+ }
+
+ private[this] def stringDoubleImpl(keys: Seq[String])
+ : BiMap[String, Double] = {
+ val ki = keys.zipWithIndex.map(e => (e._1, e._2.toDouble))
+ new BiMap(HashMap(ki : _*))
+ }
+
+ /** Create a BiMap[String, Double] from a set of String. The Double index
+ * starts from 0.
+ * @param keys a set of String
+ * @return a String to Double BiMap
+ */
+ def stringDouble(keys: Set[String]): BiMap[String, Double] = {
+ // val hm = HashMap(keys.toSeq.zipWithIndex.map(_.toDouble) : _*)
+ // new BiMap(hm)
+ stringDoubleImpl(keys.toSeq)
+ }
+
+ /** Create a BiMap[String, Double] from an array of String.
+ * NOTE: the the array cannot have duplicated element.
+ * The Double index starts from 0.
+ * @param keys a set of String
+ * @return a String to Double BiMap
+ */
+ def stringDouble(keys: Array[String]): BiMap[String, Double] = {
+ // val hm = HashMap(keys.zipWithIndex.mapValues(_.toDouble) : _*)
+ // new BiMap(hm)
+ stringDoubleImpl(keys.toSeq)
+ }
+
+ /** Create a BiMap[String, Double] from RDD[String]. The Double index starts
+ * from 0.
+ * @param keys RDD of String
+ * @return a String to Double BiMap
+ */
+ def stringDouble(keys: RDD[String]): BiMap[String, Double] = {
+ stringDoubleImpl(keys.distinct.collect)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala
new file mode 100644
index 0000000..e602e1e
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala
@@ -0,0 +1,79 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+
+/** :: DeveloperApi ::
+ * Stores mapping of channel IDs, names and app ID
+ *
+ * @param id ID of the channel
+ * @param name Name of the channel (must be unique within the same app)
+ * @param appid ID of the app which this channel belongs to
+ * @group Meta Data
+ */
+@DeveloperApi
+case class Channel(
+ id: Int,
+ name: String, // must be unique within the same app
+ appid: Int
+) {
+ require(Channel.isValidName(name),
+ "Invalid channel name: ${name}. ${Channel.nameConstraint}")
+}
+
+/** :: DeveloperApi ::
+ * Companion object of [[Channel]]
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+object Channel {
+ /** Examine whether the supplied channel name is valid. A valid channel name
+ * must consists of 1 to 16 alphanumeric and '-' characters.
+ *
+ * @param s Channel name to examine
+ * @return true if channel name is valid, false otherwise
+ */
+ def isValidName(s: String): Boolean = {
+ // note: update channelNameConstraint if this rule is changed
+ s.matches("^[a-zA-Z0-9-]{1,16}$")
+ }
+
+ /** For consistent error message display */
+ val nameConstraint: String =
+ "Only alphanumeric and - characters are allowed and max length is 16."
+}
+
+/** :: DeveloperApi ::
+ * Base trait of the [[Channel]] data access object
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+trait Channels {
+ /** Insert a new [[Channel]]. Returns a generated channel ID if original ID is 0. */
+ def insert(channel: Channel): Option[Int]
+
+ /** Get a [[Channel]] by channel ID */
+ def get(id: Int): Option[Channel]
+
+ /** Get all [[Channel]] by app ID */
+ def getByAppid(appid: Int): Seq[Channel]
+
+ /** Delete a [[Channel]] */
+ def delete(id: Int): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala
new file mode 100644
index 0000000..93b6f51
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala
@@ -0,0 +1,241 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.json4s._
+import org.json4s.native.JsonMethods.parse
+
+import scala.collection.GenTraversableOnce
+import scala.collection.JavaConversions
+
+/** Exception class for [[DataMap]]
+ *
+ * @group Event Data
+ */
+case class DataMapException(msg: String, cause: Exception)
+ extends Exception(msg, cause) {
+ def this(msg: String) = this(msg, null)
+}
+
+/** A DataMap stores properties of the event or entity. Internally it is a Map
+ * whose keys are property names and values are corresponding JSON values
+ * respectively. Use the [[get]] method to retrieve the value of a mandatory
+ * property or use [[getOpt]] to retrieve the value of an optional property.
+ *
+ * @param fields Map of property name to JValue
+ * @group Event Data
+ */
+class DataMap (
+ val fields: Map[String, JValue]
+) extends Serializable {
+ @transient lazy implicit private val formats = DefaultFormats +
+ new DateTimeJson4sSupport.Serializer
+
+ /** Check the existence of a required property name. Throw an exception if
+ * it does not exist.
+ *
+ * @param name The property name
+ */
+ def require(name: String): Unit = {
+ if (!fields.contains(name)) {
+ throw new DataMapException(s"The field $name is required.")
+ }
+ }
+
+ /** Check if this DataMap contains a specific property.
+ *
+ * @param name The property name
+ * @return Return true if the property exists, else false.
+ */
+ def contains(name: String): Boolean = {
+ fields.contains(name)
+ }
+
+ /** Get the value of a mandatory property. Exception is thrown if the property
+ * does not exist.
+ *
+ * @tparam T The type of the property value
+ * @param name The property name
+ * @return Return the property value of type T
+ */
+ def get[T: Manifest](name: String): T = {
+ require(name)
+ fields(name) match {
+ case JNull => throw new DataMapException(
+ s"The required field $name cannot be null.")
+ case x: JValue => x.extract[T]
+ }
+ }
+
+ /** Get the value of an optional property. Return None if the property does
+ * not exist.
+ *
+ * @tparam T The type of the property value
+ * @param name The property name
+ * @return Return the property value of type Option[T]
+ */
+ def getOpt[T: Manifest](name: String): Option[T] = {
+ // either the field doesn't exist or its value is null
+ fields.get(name).flatMap(_.extract[Option[T]])
+ }
+
+ /** Get the value of an optional property. Return default value if the
+ * property does not exist.
+ *
+ * @tparam T The type of the property value
+ * @param name The property name
+ * @param default The default property value of type T
+ * @return Return the property value of type T
+ */
+ def getOrElse[T: Manifest](name: String, default: T): T = {
+ getOpt[T](name).getOrElse(default)
+ }
+
+ /** Java-friendly method for getting the value of a property. Return null if the
+ * property does not exist.
+ *
+ * @tparam T The type of the property value
+ * @param name The property name
+ * @param clazz The class of the type of the property value
+ * @return Return the property value of type T
+ */
+ def get[T](name: String, clazz: java.lang.Class[T]): T = {
+ val manifest = new Manifest[T] {
+ override def erasure: Class[_] = clazz
+ override def runtimeClass: Class[_] = clazz
+ }
+
+ fields.get(name) match {
+ case None => null.asInstanceOf[T]
+ case Some(JNull) => null.asInstanceOf[T]
+ case Some(x) => x.extract[T](formats, manifest)
+ }
+ }
+
+ /** Java-friendly method for getting a list of values of a property. Return null if the
+ * property does not exist.
+ *
+ * @param name The property name
+ * @return Return the list of property values
+ */
+ def getStringList(name: String): java.util.List[String] = {
+ fields.get(name) match {
+ case None => null
+ case Some(JNull) => null
+ case Some(x) =>
+ JavaConversions.seqAsJavaList(x.extract[List[String]](formats, manifest[List[String]]))
+ }
+ }
+
+ /** Return a new DataMap with elements containing elements from the left hand
+ * side operand followed by elements from the right hand side operand.
+ *
+ * @param that Right hand side DataMap
+ * @return A new DataMap
+ */
+ def ++ (that: DataMap): DataMap = DataMap(this.fields ++ that.fields)
+
+ /** Creates a new DataMap from this DataMap by removing all elements of
+ * another collection.
+ *
+ * @param that A collection containing the removed property names
+ * @return A new DataMap
+ */
+ def -- (that: GenTraversableOnce[String]): DataMap =
+ DataMap(this.fields -- that)
+
+ /** Tests whether the DataMap is empty.
+ *
+ * @return true if the DataMap is empty, false otherwise.
+ */
+ def isEmpty: Boolean = fields.isEmpty
+
+ /** Collects all property names of this DataMap in a set.
+ *
+ * @return a set containing all property names of this DataMap.
+ */
+ def keySet: Set[String] = this.fields.keySet
+
+ /** Converts this DataMap to a List.
+ *
+ * @return a list of (property name, JSON value) tuples.
+ */
+ def toList(): List[(String, JValue)] = fields.toList
+
+ /** Converts this DataMap to a JObject.
+ *
+ * @return the JObject initialized by this DataMap.
+ */
+ def toJObject(): JObject = JObject(toList())
+
+ /** Converts this DataMap to case class of type T.
+ *
+ * @return the object of type T.
+ */
+ def extract[T: Manifest]: T = {
+ toJObject().extract[T]
+ }
+
+ override
+ def toString: String = s"DataMap($fields)"
+
+ override
+ def hashCode: Int = 41 + fields.hashCode
+
+ override
+ def equals(other: Any): Boolean = other match {
+ case that: DataMap => that.canEqual(this) && this.fields.equals(that.fields)
+ case _ => false
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[DataMap]
+}
+
+/** Companion object of the [[DataMap]] class
+ *
+ * @group Event Data
+ */
+object DataMap {
+ /** Create an empty DataMap
+ * @return an empty DataMap
+ */
+ def apply(): DataMap = new DataMap(Map[String, JValue]())
+
+ /** Create an DataMap from a Map of String to JValue
+ * @param fields a Map of String to JValue
+ * @return a new DataMap initialized by fields
+ */
+ def apply(fields: Map[String, JValue]): DataMap = new DataMap(fields)
+
+ /** Create an DataMap from a JObject
+ * @param jObj JObject
+ * @return a new DataMap initialized by a JObject
+ */
+ def apply(jObj: JObject): DataMap = {
+ if (jObj == null) {
+ apply()
+ } else {
+ new DataMap(jObj.obj.toMap)
+ }
+ }
+
+ /** Create an DataMap from a JSON String
+ * @param js JSON String. eg """{ "a": 1, "b": "foo" }"""
+ * @return a new DataMap initialized by a JSON string
+ */
+ def apply(js: String): DataMap = apply(parse(js).asInstanceOf[JObject])
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala
new file mode 100644
index 0000000..b3789a4
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala
@@ -0,0 +1,47 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.data.{Utils => DataUtils}
+import org.joda.time.DateTime
+import org.json4s._
+
+/** :: DeveloperApi ::
+ * JSON4S serializer for Joda-Time
+ *
+ * @group Common
+ */
+@DeveloperApi
+object DateTimeJson4sSupport {
+
+ @transient lazy implicit val formats = DefaultFormats
+
+ /** Serialize DateTime to JValue */
+ def serializeToJValue: PartialFunction[Any, JValue] = {
+ case d: DateTime => JString(DataUtils.dateTimeToString(d))
+ }
+
+ /** Deserialize JValue to DateTime */
+ def deserializeFromJValue: PartialFunction[JValue, DateTime] = {
+ case jv: JValue => DataUtils.stringToDateTime(jv.extract[String])
+ }
+
+ /** Custom JSON4S serializer for Joda-Time */
+ class Serializer extends CustomSerializer[DateTime](format => (
+ deserializeFromJValue, serializeToJValue))
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
new file mode 100644
index 0000000..bc71f3f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
@@ -0,0 +1,177 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+ * Stores parameters, model, and other information for each engine instance
+ *
+ * @param id Engine instance ID.
+ * @param status Status of the engine instance.
+ * @param startTime Start time of the training/evaluation.
+ * @param endTime End time of the training/evaluation.
+ * @param engineId Engine ID of the instance.
+ * @param engineVersion Engine version of the instance.
+ * @param engineVariant Engine variant ID of the instance.
+ * @param engineFactory Engine factory class for the instance.
+ * @param batch A batch label of the engine instance.
+ * @param env The environment in which the instance was created.
+ * @param sparkConf Custom Spark configuration of the instance.
+ * @param dataSourceParams Data source parameters of the instance.
+ * @param preparatorParams Preparator parameters of the instance.
+ * @param algorithmsParams Algorithms parameters of the instance.
+ * @param servingParams Serving parameters of the instance.
+ * @group Meta Data
+ */
+@DeveloperApi
+case class EngineInstance(
+ id: String,
+ status: String,
+ startTime: DateTime,
+ endTime: DateTime,
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String,
+ engineFactory: String,
+ batch: String,
+ env: Map[String, String],
+ sparkConf: Map[String, String],
+ dataSourceParams: String,
+ preparatorParams: String,
+ algorithmsParams: String,
+ servingParams: String)
+
+/** :: DeveloperApi ::
+ * Base trait of the [[EngineInstance]] data access object
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+trait EngineInstances {
+ /** Insert a new [[EngineInstance]] */
+ def insert(i: EngineInstance): String
+
+ /** Get an [[EngineInstance]] by ID */
+ def get(id: String): Option[EngineInstance]
+
+ /** Get all [[EngineInstance]]s */
+ def getAll(): Seq[EngineInstance]
+
+ /** Get an instance that has started training the latest and has trained to
+ * completion
+ */
+ def getLatestCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Option[EngineInstance]
+
+ /** Get all instances that has trained to completion */
+ def getCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Seq[EngineInstance]
+
+ /** Update an [[EngineInstance]] */
+ def update(i: EngineInstance): Unit
+
+ /** Delete an [[EngineInstance]] */
+ def delete(id: String): Unit
+}
+
+/** :: DeveloperApi ::
+ * JSON4S serializer for [[EngineInstance]]
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+class EngineInstanceSerializer
+ extends CustomSerializer[EngineInstance](
+ format => ({
+ case JObject(fields) =>
+ implicit val formats = DefaultFormats
+ val seed = EngineInstance(
+ id = "",
+ status = "",
+ startTime = DateTime.now,
+ endTime = DateTime.now,
+ engineId = "",
+ engineVersion = "",
+ engineVariant = "",
+ engineFactory = "",
+ batch = "",
+ env = Map(),
+ sparkConf = Map(),
+ dataSourceParams = "",
+ preparatorParams = "",
+ algorithmsParams = "",
+ servingParams = "")
+ fields.foldLeft(seed) { case (i, field) =>
+ field match {
+ case JField("id", JString(id)) => i.copy(id = id)
+ case JField("status", JString(status)) => i.copy(status = status)
+ case JField("startTime", JString(startTime)) =>
+ i.copy(startTime = Utils.stringToDateTime(startTime))
+ case JField("endTime", JString(endTime)) =>
+ i.copy(endTime = Utils.stringToDateTime(endTime))
+ case JField("engineId", JString(engineId)) =>
+ i.copy(engineId = engineId)
+ case JField("engineVersion", JString(engineVersion)) =>
+ i.copy(engineVersion = engineVersion)
+ case JField("engineVariant", JString(engineVariant)) =>
+ i.copy(engineVariant = engineVariant)
+ case JField("engineFactory", JString(engineFactory)) =>
+ i.copy(engineFactory = engineFactory)
+ case JField("batch", JString(batch)) => i.copy(batch = batch)
+ case JField("env", env) =>
+ i.copy(env = Extraction.extract[Map[String, String]](env))
+ case JField("sparkConf", sparkConf) =>
+ i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf))
+ case JField("dataSourceParams", JString(dataSourceParams)) =>
+ i.copy(dataSourceParams = dataSourceParams)
+ case JField("preparatorParams", JString(preparatorParams)) =>
+ i.copy(preparatorParams = preparatorParams)
+ case JField("algorithmsParams", JString(algorithmsParams)) =>
+ i.copy(algorithmsParams = algorithmsParams)
+ case JField("servingParams", JString(servingParams)) =>
+ i.copy(servingParams = servingParams)
+ case _ => i
+ }
+ }
+ },
+ {
+ case i: EngineInstance =>
+ JObject(
+ JField("id", JString(i.id)) ::
+ JField("status", JString(i.status)) ::
+ JField("startTime", JString(i.startTime.toString)) ::
+ JField("endTime", JString(i.endTime.toString)) ::
+ JField("engineId", JString(i.engineId)) ::
+ JField("engineVersion", JString(i.engineVersion)) ::
+ JField("engineVariant", JString(i.engineVariant)) ::
+ JField("engineFactory", JString(i.engineFactory)) ::
+ JField("batch", JString(i.batch)) ::
+ JField("env", Extraction.decompose(i.env)(DefaultFormats)) ::
+ JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) ::
+ JField("dataSourceParams", JString(i.dataSourceParams)) ::
+ JField("preparatorParams", JString(i.preparatorParams)) ::
+ JField("algorithmsParams", JString(i.algorithmsParams)) ::
+ JField("servingParams", JString(i.servingParams)) ::
+ Nil)
+ }
+))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
new file mode 100644
index 0000000..372a2e7
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
@@ -0,0 +1,117 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+ * Provides a way to discover engines by ID and version in a distributed
+ * environment
+ *
+ * @param id Unique identifier of an engine.
+ * @param version Engine version string.
+ * @param name A short and descriptive name for the engine.
+ * @param description A long description of the engine.
+ * @param files Paths to engine files.
+ * @param engineFactory Engine's factory class name.
+ * @group Meta Data
+ */
+@DeveloperApi
+case class EngineManifest(
+ id: String,
+ version: String,
+ name: String,
+ description: Option[String],
+ files: Seq[String],
+ engineFactory: String)
+
+/** :: DeveloperApi ::
+ * Base trait of the [[EngineManifest]] data access object
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+trait EngineManifests {
+ /** Inserts an [[EngineManifest]] */
+ def insert(engineManifest: EngineManifest): Unit
+
+ /** Get an [[EngineManifest]] by its ID */
+ def get(id: String, version: String): Option[EngineManifest]
+
+ /** Get all [[EngineManifest]] */
+ def getAll(): Seq[EngineManifest]
+
+ /** Updates an [[EngineManifest]] */
+ def update(engineInfo: EngineManifest, upsert: Boolean = false): Unit
+
+ /** Delete an [[EngineManifest]] by its ID */
+ def delete(id: String, version: String): Unit
+}
+
+/** :: DeveloperApi ::
+ * JSON4S serializer for [[EngineManifest]]
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+class EngineManifestSerializer
+ extends CustomSerializer[EngineManifest](format => (
+ {
+ case JObject(fields) =>
+ val seed = EngineManifest(
+ id = "",
+ version = "",
+ name = "",
+ description = None,
+ files = Nil,
+ engineFactory = "")
+ fields.foldLeft(seed) { case (enginemanifest, field) =>
+ field match {
+ case JField("id", JString(id)) => enginemanifest.copy(id = id)
+ case JField("version", JString(version)) =>
+ enginemanifest.copy(version = version)
+ case JField("name", JString(name)) => enginemanifest.copy(name = name)
+ case JField("description", JString(description)) =>
+ enginemanifest.copy(description = Some(description))
+ case JField("files", JArray(s)) =>
+ enginemanifest.copy(files = s.map(t =>
+ t match {
+ case JString(file) => file
+ case _ => ""
+ }
+ ))
+ case JField("engineFactory", JString(engineFactory)) =>
+ enginemanifest.copy(engineFactory = engineFactory)
+ case _ => enginemanifest
+ }
+ }
+ },
+ {
+ case enginemanifest: EngineManifest =>
+ JObject(
+ JField("id", JString(enginemanifest.id)) ::
+ JField("version", JString(enginemanifest.version)) ::
+ JField("name", JString(enginemanifest.name)) ::
+ JField("description",
+ enginemanifest.description.map(
+ x => JString(x)).getOrElse(JNothing)) ::
+ JField("files",
+ JArray(enginemanifest.files.map(x => JString(x)).toList)) ::
+ JField("engineFactory", JString(enginemanifest.engineFactory)) ::
+ Nil)
+ }
+))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala
new file mode 100644
index 0000000..aa7224c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala
@@ -0,0 +1,98 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.Experimental
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+/**
+ * :: Experimental ::
+ */
+@Experimental
+class EntityIdIxMap(val idToIx: BiMap[String, Long]) extends Serializable {
+
+ val ixToId: BiMap[Long, String] = idToIx.inverse
+
+ def apply(id: String): Long = idToIx(id)
+
+ def apply(ix: Long): String = ixToId(ix)
+
+ def contains(id: String): Boolean = idToIx.contains(id)
+
+ def contains(ix: Long): Boolean = ixToId.contains(ix)
+
+ def get(id: String): Option[Long] = idToIx.get(id)
+
+ def get(ix: Long): Option[String] = ixToId.get(ix)
+
+ def getOrElse(id: String, default: => Long): Long =
+ idToIx.getOrElse(id, default)
+
+ def getOrElse(ix: Long, default: => String): String =
+ ixToId.getOrElse(ix, default)
+
+ def toMap: Map[String, Long] = idToIx.toMap
+
+ def size: Long = idToIx.size
+
+ def take(n: Int): EntityIdIxMap = new EntityIdIxMap(idToIx.take(n))
+
+ override def toString: String = idToIx.toString
+}
+
+/** :: Experimental :: */
+@Experimental
+object EntityIdIxMap {
+ def apply(keys: RDD[String]): EntityIdIxMap = {
+ new EntityIdIxMap(BiMap.stringLong(keys))
+ }
+}
+
+/** :: Experimental :: */
+@Experimental
+class EntityMap[A](val idToData: Map[String, A],
+ override val idToIx: BiMap[String, Long]) extends EntityIdIxMap(idToIx) {
+
+ def this(idToData: Map[String, A]) = this(
+ idToData,
+ BiMap.stringLong(idToData.keySet)
+ )
+
+ def data(id: String): A = idToData(id)
+
+ def data(ix: Long): A = idToData(ixToId(ix))
+
+ def getData(id: String): Option[A] = idToData.get(id)
+
+ def getData(ix: Long): Option[A] = idToData.get(ixToId(ix))
+
+ def getOrElseData(id: String, default: => A): A =
+ getData(id).getOrElse(default)
+
+ def getOrElseData(ix: Long, default: => A): A =
+ getData(ix).getOrElse(default)
+
+ override def take(n: Int): EntityMap[A] = {
+ val newIdToIx = idToIx.take(n)
+ new EntityMap[A](idToData.filterKeys(newIdToIx.contains(_)), newIdToIx)
+ }
+
+ override def toString: String = {
+ s"idToData: ${idToData.toString} " + s"idToix: ${idToIx.toString}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
new file mode 100644
index 0000000..a58e642
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
@@ -0,0 +1,135 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+ * Stores meta information for each evaluation instance.
+ *
+ * @param id Instance ID.
+ * @param status Status of this instance.
+ * @param startTime Start time of this instance.
+ * @param endTime End time of this instance.
+ * @param evaluationClass Evaluation class name of this instance.
+ * @param engineParamsGeneratorClass Engine parameters generator class name of this instance.
+ * @param batch Batch label of this instance.
+ * @param env The environment in which this instance was created.
+ * @param evaluatorResults Results of the evaluator.
+ * @param evaluatorResultsHTML HTML results of the evaluator.
+ * @param evaluatorResultsJSON JSON results of the evaluator.
+ * @group Meta Data
+ */
+@DeveloperApi
+case class EvaluationInstance(
+ id: String = "",
+ status: String = "",
+ startTime: DateTime = DateTime.now,
+ endTime: DateTime = DateTime.now,
+ evaluationClass: String = "",
+ engineParamsGeneratorClass: String = "",
+ batch: String = "",
+ env: Map[String, String] = Map(),
+ sparkConf: Map[String, String] = Map(),
+ evaluatorResults: String = "",
+ evaluatorResultsHTML: String = "",
+ evaluatorResultsJSON: String = "")
+
+/** :: DeveloperApi ::
+ * Base trait of the [[EvaluationInstance]] data access object
+ *
+ * @group Meta Data
+ */
+@DeveloperApi
+trait EvaluationInstances {
+ /** Insert a new [[EvaluationInstance]] */
+ def insert(i: EvaluationInstance): String
+
+ /** Get an [[EvaluationInstance]] by ID */
+ def get(id: String): Option[EvaluationInstance]
+
+ /** Get all [[EvaluationInstances]] */
+ def getAll: Seq[EvaluationInstance]
+
+ /** Get instances that are produced by evaluation and have run to completion,
+ * reverse sorted by the start time
+ */
+ def getCompleted: Seq[EvaluationInstance]
+
+ /** Update an [[EvaluationInstance]] */
+ def update(i: EvaluationInstance): Unit
+
+ /** Delete an [[EvaluationInstance]] */
+ def delete(id: String): Unit
+}
+
+/** :: DeveloperApi ::
+ * JSON4S serializer for [[EvaluationInstance]]
+ *
+ * @group Meta Data
+ */
+class EvaluationInstanceSerializer extends CustomSerializer[EvaluationInstance](
+ format => ({
+ case JObject(fields) =>
+ implicit val formats = DefaultFormats
+ fields.foldLeft(EvaluationInstance()) { case (i, field) =>
+ field match {
+ case JField("id", JString(id)) => i.copy(id = id)
+ case JField("status", JString(status)) => i.copy(status = status)
+ case JField("startTime", JString(startTime)) =>
+ i.copy(startTime = Utils.stringToDateTime(startTime))
+ case JField("endTime", JString(endTime)) =>
+ i.copy(endTime = Utils.stringToDateTime(endTime))
+ case JField("evaluationClass", JString(evaluationClass)) =>
+ i.copy(evaluationClass = evaluationClass)
+ case JField("engineParamsGeneratorClass", JString(engineParamsGeneratorClass)) =>
+ i.copy(engineParamsGeneratorClass = engineParamsGeneratorClass)
+ case JField("batch", JString(batch)) => i.copy(batch = batch)
+ case JField("env", env) =>
+ i.copy(env = Extraction.extract[Map[String, String]](env))
+ case JField("sparkConf", sparkConf) =>
+ i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf))
+ case JField("evaluatorResults", JString(evaluatorResults)) =>
+ i.copy(evaluatorResults = evaluatorResults)
+ case JField("evaluatorResultsHTML", JString(evaluatorResultsHTML)) =>
+ i.copy(evaluatorResultsHTML = evaluatorResultsHTML)
+ case JField("evaluatorResultsJSON", JString(evaluatorResultsJSON)) =>
+ i.copy(evaluatorResultsJSON = evaluatorResultsJSON)
+ case _ => i
+ }
+ }
+ }, {
+ case i: EvaluationInstance =>
+ JObject(
+ JField("id", JString(i.id)) ::
+ JField("status", JString(i.status)) ::
+ JField("startTime", JString(i.startTime.toString)) ::
+ JField("endTime", JString(i.endTime.toString)) ::
+ JField("evaluationClass", JString(i.evaluationClass)) ::
+ JField("engineParamsGeneratorClass", JString(i.engineParamsGeneratorClass)) ::
+ JField("batch", JString(i.batch)) ::
+ JField("env", Extraction.decompose(i.env)(DefaultFormats)) ::
+ JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) ::
+ JField("evaluatorResults", JString(i.evaluatorResults)) ::
+ JField("evaluatorResultsHTML", JString(i.evaluatorResultsHTML)) ::
+ JField("evaluatorResultsJSON", JString(i.evaluatorResultsJSON)) ::
+ Nil
+ )
+ }
+ )
+)