You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:43 UTC

[12/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
new file mode 100644
index 0000000..7174ec8
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -0,0 +1,640 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import akka.event.Logging
+import sun.misc.BASE64Decoder
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.io.IO
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.predictionio.data.Utils
+import org.apache.predictionio.data.storage.AccessKeys
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.DateTimeJson4sSupport
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventJson4sSupport
+import org.apache.predictionio.data.storage.BatchEventsJson4sSupport
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.Storage
+import org.json4s.DefaultFormats
+import org.json4s.Formats
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+import spray.can.Http
+import spray.http.FormData
+import spray.http.MediaTypes
+import spray.http.StatusCodes
+import spray.httpx.Json4sSupport
+import spray.routing._
+import spray.routing.authentication.Authentication
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.util.{Try, Success, Failure}
+
+class  EventServiceActor(
+    val eventClient: LEvents,
+    val accessKeysClient: AccessKeys,
+    val channelsClient: Channels,
+    val config: EventServerConfig) extends HttpServiceActor {
+
+  object Json4sProtocol extends Json4sSupport {
+    implicit def json4sFormats: Formats = DefaultFormats +
+      new EventJson4sSupport.APISerializer +
+      new BatchEventsJson4sSupport.APISerializer +
+      // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
+      // some format not converted, or timezone not correct
+      new DateTimeJson4sSupport.Serializer
+  }
+
+
+  val MaxNumberOfEventsPerBatchRequest = 50
+
+  val logger = Logging(context.system, this)
+
+  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
+  // Futures
+  implicit def executionContext: ExecutionContext = context.dispatcher
+
+  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+
+  val rejectionHandler = Common.rejectionHandler
+
+  val jsonPath = """(.+)\.json$""".r
+  val formPath = """(.+)\.form$""".r
+
+  val pluginContext = EventServerPluginContext(logger)
+
+  private lazy val base64Decoder = new BASE64Decoder
+
+  case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
+
+  /* with accessKey in query/header, return appId if succeed */
+  def withAccessKey: RequestContext => Future[Authentication[AuthData]] = {
+    ctx: RequestContext =>
+      val accessKeyParamOpt = ctx.request.uri.query.get("accessKey")
+      val channelParamOpt = ctx.request.uri.query.get("channel")
+      Future {
+        // with accessKey in query, return appId if succeed
+        accessKeyParamOpt.map { accessKeyParam =>
+          accessKeysClient.get(accessKeyParam).map { k =>
+            channelParamOpt.map { ch =>
+              val channelMap =
+                channelsClient.getByAppid(k.appid)
+                .map(c => (c.name, c.id)).toMap
+              if (channelMap.contains(ch)) {
+                Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
+              } else {
+                Left(ChannelRejection(s"Invalid channel '$ch'."))
+              }
+            }.getOrElse{
+              Right(AuthData(k.appid, None, k.events))
+            }
+          }.getOrElse(FailedAuth)
+        }.getOrElse {
+          // with accessKey in header, return appId if succeed
+          ctx.request.headers.find(_.name == "Authorization").map { authHeader \u21d2
+            authHeader.value.split("Basic ") match {
+              case Array(_, value) \u21d2
+                val appAccessKey =
+                  new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
+                accessKeysClient.get(appAccessKey) match {
+                  case Some(k) \u21d2 Right(AuthData(k.appid, None, k.events))
+                  case None \u21d2 FailedAuth
+                }
+
+              case _ \u21d2 FailedAuth
+            }
+          }.getOrElse(MissedAuth)
+        }
+      }
+  }
+
+  private val FailedAuth = Left(
+    AuthenticationFailedRejection(
+      AuthenticationFailedRejection.CredentialsRejected, List()
+    )
+  )
+
+  private val MissedAuth = Left(
+    AuthenticationFailedRejection(
+      AuthenticationFailedRejection.CredentialsMissing, List()
+    )
+  )
+
+  lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor")
+  lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor")
+
+  val route: Route =
+    pathSingleSlash {
+      import Json4sProtocol._
+
+      get {
+        respondWithMediaType(MediaTypes.`application/json`) {
+          complete(Map("status" -> "alive"))
+        }
+      }
+    } ~
+    path("plugins.json") {
+      import Json4sProtocol._
+      get {
+        respondWithMediaType(MediaTypes.`application/json`) {
+          complete {
+            Map("plugins" -> Map(
+              "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) =>
+                n -> Map(
+                  "name" -> p.pluginName,
+                  "description" -> p.pluginDescription,
+                  "class" -> p.getClass.getName)
+              },
+              "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) =>
+                n -> Map(
+                  "name" -> p.pluginName,
+                  "description" -> p.pluginDescription,
+                  "class" -> p.getClass.getName)
+              }
+            ))
+          }
+        }
+      }
+    } ~
+    path("plugins" / Segments) { segments =>
+      get {
+        handleExceptions(Common.exceptionHandler) {
+          authenticate(withAccessKey) { authData =>
+            respondWithMediaType(MediaTypes.`application/json`) {
+              complete {
+                val pluginArgs = segments.drop(2)
+                val pluginType = segments(0)
+                val pluginName = segments(1)
+                pluginType match {
+                  case EventServerPlugin.inputBlocker =>
+                    pluginContext.inputBlockers(pluginName).handleREST(
+                      authData.appId,
+                      authData.channelId,
+                      pluginArgs)
+                  case EventServerPlugin.inputSniffer =>
+                    pluginsActorRef ? PluginsActor.HandleREST(
+                      appId = authData.appId,
+                      channelId = authData.channelId,
+                      pluginName = pluginName,
+                      pluginArgs = pluginArgs) map {
+                      _.asInstanceOf[String]
+                    }
+                }
+              }
+            }
+          }
+        }
+      }
+    } ~
+    path("events" / jsonPath ) { eventId =>
+
+      import Json4sProtocol._
+
+      get {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              respondWithMediaType(MediaTypes.`application/json`) {
+                complete {
+                  logger.debug(s"GET event ${eventId}.")
+                  val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt =>
+                    eventOpt.map( event =>
+                      (StatusCodes.OK, event)
+                    ).getOrElse(
+                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
+                    )
+                  }
+                  data
+                }
+              }
+            }
+          }
+        }
+      } ~
+      delete {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              respondWithMediaType(MediaTypes.`application/json`) {
+                complete {
+                  logger.debug(s"DELETE event ${eventId}.")
+                  val data = eventClient.futureDelete(eventId, appId, channelId).map { found =>
+                    if (found) {
+                      (StatusCodes.OK, Map("message" -> "Found"))
+                    } else {
+                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
+                    }
+                  }
+                  data
+                }
+              }
+            }
+          }
+        }
+      }
+    } ~
+    path("events.json") {
+
+      import Json4sProtocol._
+
+      post {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              val events = authData.events
+              entity(as[Event]) { event =>
+                complete {
+                  if (events.isEmpty || authData.events.contains(event.event)) {
+                    pluginContext.inputBlockers.values.foreach(
+                      _.process(EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event), pluginContext))
+                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+                      pluginsActorRef ! EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event)
+                      val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+                      if (config.stats) {
+                        statsActorRef ! Bookkeeping(appId, result._1, event)
+                      }
+                      result
+                    }
+                    data
+                  } else {
+                    (StatusCodes.Forbidden,
+                      Map("message" -> s"${event.event} events are not allowed"))
+                  }
+                }
+              }
+            }
+          }
+        }
+      } ~
+      get {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              parameters(
+                'startTime.as[Option[String]],
+                'untilTime.as[Option[String]],
+                'entityType.as[Option[String]],
+                'entityId.as[Option[String]],
+                'event.as[Option[String]],
+                'targetEntityType.as[Option[String]],
+                'targetEntityId.as[Option[String]],
+                'limit.as[Option[Int]],
+                'reversed.as[Option[Boolean]]) {
+                (startTimeStr, untilTimeStr, entityType, entityId,
+                  eventName,  // only support one event name
+                  targetEntityType, targetEntityId,
+                  limit, reversed) =>
+                respondWithMediaType(MediaTypes.`application/json`) {
+                  complete {
+                    logger.debug(
+                      s"GET events of appId=${appId} " +
+                      s"st=${startTimeStr} ut=${untilTimeStr} " +
+                      s"et=${entityType} eid=${entityId} " +
+                      s"li=${limit} rev=${reversed} ")
+
+                    require(!((reversed == Some(true))
+                      && (entityType.isEmpty || entityId.isEmpty)),
+                      "the parameter reversed can only be used with" +
+                      " both entityType and entityId specified.")
+
+                    val parseTime = Future {
+                      val startTime = startTimeStr.map(Utils.stringToDateTime(_))
+                      val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
+                      (startTime, untilTime)
+                    }
+
+
+                    parseTime.flatMap { case (startTime, untilTime) =>
+                      val data = eventClient.futureFind(
+                        appId = appId,
+                        channelId = channelId,
+                        startTime = startTime,
+                        untilTime = untilTime,
+                        entityType = entityType,
+                        entityId = entityId,
+                        eventNames = eventName.map(List(_)),
+                        targetEntityType = targetEntityType.map(Some(_)),
+                        targetEntityId = targetEntityId.map(Some(_)),
+                        limit = limit.orElse(Some(20)),
+                        reversed = reversed)
+                        .map { eventIter =>
+                          if (eventIter.hasNext) {
+                            (StatusCodes.OK, eventIter.toArray)
+                          } else {
+                            (StatusCodes.NotFound,
+                              Map("message" -> "Not Found"))
+                          }
+                        }
+                      data
+                    }.recover {
+                      case e: Exception =>
+                        (StatusCodes.BadRequest, Map("message" -> s"${e}"))
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    } ~
+    path("batch" / "events.json") {
+
+      import Json4sProtocol._
+
+      post {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              val allowedEvents = authData.events
+              val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
+                case Success(event) => {
+                  if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
+                    pluginContext.inputBlockers.values.foreach(
+                      _.process(EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event), pluginContext))
+                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+                      pluginsActorRef ! EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event)
+                      val status = StatusCodes.Created
+                      val result = Map(
+                        "status" -> status.intValue,
+                        "eventId" -> s"${id}")
+                      if (config.stats) {
+                        statsActorRef ! Bookkeeping(appId, status, event)
+                      }
+                      result
+                    }.recover { case exception =>
+                      Map(
+                        "status" -> StatusCodes.InternalServerError.intValue,
+                        "message" -> s"${exception.getMessage()}")
+                    }
+                    data
+                  } else {
+                    Future.successful(Map(
+                      "status" -> StatusCodes.Forbidden.intValue,
+                      "message" -> s"${event.event} events are not allowed"))
+                  }
+                }
+                case Failure(exception) => {
+                  Future.successful(Map(
+                    "status" -> StatusCodes.BadRequest.intValue,
+                    "message" -> s"${exception.getMessage()}"))
+                }
+              }
+
+              entity(as[Seq[Try[Event]]]) { events =>
+                complete {
+                  if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+                    Future.traverse(events)(handleEvent)
+                  } else {
+                    (StatusCodes.BadRequest,
+                      Map("message" -> (s"Batch request must have less than or equal to " +
+                        s"${MaxNumberOfEventsPerBatchRequest} events")))
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    } ~
+    path("stats.json") {
+
+      import Json4sProtocol._
+
+      get {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              respondWithMediaType(MediaTypes.`application/json`) {
+                if (config.stats) {
+                  complete {
+                    statsActorRef ? GetStats(appId) map {
+                      _.asInstanceOf[Map[String, StatsSnapshot]]
+                    }
+                  }
+                } else {
+                  complete(
+                    StatusCodes.NotFound,
+                    parse("""{"message": "To see stats, launch Event Server """ +
+                      """with --stats argument."}"""))
+                }
+              }
+            }
+          }
+        }
+      }  // stats.json get
+    } ~
+    path("webhooks" / jsonPath ) { web =>
+      import Json4sProtocol._
+
+      post {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              respondWithMediaType(MediaTypes.`application/json`) {
+                entity(as[JObject]) { jObj =>
+                  complete {
+                    Webhooks.postJson(
+                      appId = appId,
+                      channelId = channelId,
+                      web = web,
+                      data = jObj,
+                      eventClient = eventClient,
+                      log = logger,
+                      stats = config.stats,
+                      statsActorRef = statsActorRef)
+                  }
+                }
+              }
+            }
+          }
+        }
+      } ~
+      get {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              respondWithMediaType(MediaTypes.`application/json`) {
+                complete {
+                  Webhooks.getJson(
+                    appId = appId,
+                    channelId = channelId,
+                    web = web,
+                    log = logger)
+                }
+              }
+            }
+          }
+        }
+      }
+    } ~
+    path("webhooks" / formPath ) { web =>
+      post {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              respondWithMediaType(MediaTypes.`application/json`) {
+                entity(as[FormData]){ formData =>
+                  // logger.debug(formData.toString)
+                  complete {
+                    // respond with JSON
+                    import Json4sProtocol._
+
+                    Webhooks.postForm(
+                      appId = appId,
+                      channelId = channelId,
+                      web = web,
+                      data = formData,
+                      eventClient = eventClient,
+                      log = logger,
+                      stats = config.stats,
+                      statsActorRef = statsActorRef)
+                  }
+                }
+              }
+            }
+          }
+        }
+      } ~
+      get {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              respondWithMediaType(MediaTypes.`application/json`) {
+                complete {
+                  // respond with JSON
+                  import Json4sProtocol._
+
+                  Webhooks.getForm(
+                    appId = appId,
+                    channelId = channelId,
+                    web = web,
+                    log = logger)
+                }
+              }
+            }
+          }
+        }
+      }
+
+    }
+
+  def receive: Actor.Receive = runRoute(route)
+}
+
+
+
+/* message */
+case class StartServer(host: String, port: Int)
+
+class EventServerActor(
+    val eventClient: LEvents,
+    val accessKeysClient: AccessKeys,
+    val channelsClient: Channels,
+    val config: EventServerConfig) extends Actor with ActorLogging {
+  val child = context.actorOf(
+    Props(classOf[EventServiceActor],
+      eventClient,
+      accessKeysClient,
+      channelsClient,
+      config),
+    "EventServiceActor")
+  implicit val system = context.system
+
+  def receive: Actor.Receive = {
+    case StartServer(host, portNum) => {
+      IO(Http) ! Http.Bind(child, interface = host, port = portNum)
+    }
+    case m: Http.Bound => log.info("Bound received. EventServer is ready.")
+    case m: Http.CommandFailed => log.error("Command failed.")
+    case _ => log.error("Unknown message.")
+  }
+}
+
+case class EventServerConfig(
+  ip: String = "localhost",
+  port: Int = 7070,
+  plugins: String = "plugins",
+  stats: Boolean = false)
+
+object EventServer {
+  def createEventServer(config: EventServerConfig): Unit = {
+    implicit val system = ActorSystem("EventServerSystem")
+
+    val eventClient = Storage.getLEvents()
+    val accessKeysClient = Storage.getMetaDataAccessKeys()
+    val channelsClient = Storage.getMetaDataChannels()
+
+    val serverActor = system.actorOf(
+      Props(
+        classOf[EventServerActor],
+        eventClient,
+        accessKeysClient,
+        channelsClient,
+        config),
+      "EventServerActor"
+    )
+    if (config.stats) system.actorOf(Props[StatsActor], "StatsActor")
+    system.actorOf(Props[PluginsActor], "PluginsActor")
+    serverActor ! StartServer(config.ip, config.port)
+    system.awaitTermination()
+  }
+}
+
+object Run {
+  def main(args: Array[String]) {
+    EventServer.createEventServer(EventServerConfig(
+      ip = "0.0.0.0",
+      port = 7070))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala
new file mode 100644
index 0000000..c4918c2
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala
@@ -0,0 +1,33 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+trait EventServerPlugin {
+  val pluginName: String
+  val pluginDescription: String
+  val pluginType: String
+
+  def start(context: EventServerPluginContext): Unit
+
+  def process(eventInfo: EventInfo, context: EventServerPluginContext)
+
+  def handleREST(appId: Int, channelId: Option[Int], arguments: Seq[String]): String
+}
+
+object EventServerPlugin {
+  val inputBlocker = "inputblocker"
+  val inputSniffer = "inputsniffer"
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
new file mode 100644
index 0000000..db5743b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
@@ -0,0 +1,49 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import java.util.ServiceLoader
+
+import akka.event.LoggingAdapter
+import grizzled.slf4j.Logging
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+class EventServerPluginContext(
+    val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]],
+    val log: LoggingAdapter) {
+  def inputBlockers: Map[String, EventServerPlugin] =
+    plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap
+
+  def inputSniffers: Map[String, EventServerPlugin] =
+    plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap
+}
+
+object EventServerPluginContext extends Logging {
+  def apply(log: LoggingAdapter): EventServerPluginContext = {
+    val plugins = mutable.Map[String, mutable.Map[String, EventServerPlugin]](
+      EventServerPlugin.inputBlocker -> mutable.Map(),
+      EventServerPlugin.inputSniffer -> mutable.Map())
+    val serviceLoader = ServiceLoader.load(classOf[EventServerPlugin])
+    serviceLoader foreach { service =>
+      plugins(service.pluginType) += service.pluginName -> service
+    }
+    new EventServerPluginContext(
+      plugins,
+      log)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala
new file mode 100644
index 0000000..e6c1ae8
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import akka.actor.Actor
+import akka.event.Logging
+
+class PluginsActor() extends Actor {
+  implicit val system = context.system
+  val log = Logging(system, this)
+
+  val pluginContext = EventServerPluginContext(log)
+
+  def receive: PartialFunction[Any, Unit] = {
+    case e: EventInfo =>
+      pluginContext.inputSniffers.values.foreach(_.process(e, pluginContext))
+    case h: PluginsActor.HandleREST =>
+      try {
+        sender() ! pluginContext.inputSniffers(h.pluginName).handleREST(
+          h.appId,
+          h.channelId,
+          h.pluginArgs)
+      } catch {
+        case e: Exception =>
+          sender() ! s"""{"message":"${e.getMessage}"}"""
+      }
+    case _ =>
+      log.error("Unknown message sent to Event Server input sniffer plugin host.")
+  }
+}
+
+object PluginsActor {
+  case class HandleREST(
+    pluginName: String,
+    appId: Int,
+    channelId: Option[Int],
+    pluginArgs: Seq[String])
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
new file mode 100644
index 0000000..231d101
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
@@ -0,0 +1,79 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Event
+
+import spray.http.StatusCode
+
+import scala.collection.mutable.{ HashMap => MHashMap }
+import scala.collection.mutable
+
+import com.github.nscala_time.time.Imports.DateTime
+
+case class EntityTypesEvent(
+  val entityType: String,
+  val targetEntityType: Option[String],
+  val event: String) {
+
+  def this(e: Event) = this(
+    e.entityType,
+    e.targetEntityType,
+    e.event)
+}
+
+case class KV[K, V](key: K, value: V)
+
+case class StatsSnapshot(
+  val startTime: DateTime,
+  val endTime: Option[DateTime],
+  val basic: Seq[KV[EntityTypesEvent, Long]],
+  val statusCode: Seq[KV[StatusCode, Long]]
+)
+
+
+class Stats(val startTime: DateTime) {
+  private[this] var _endTime: Option[DateTime] = None
+  var statusCodeCount = MHashMap[(Int, StatusCode), Long]().withDefaultValue(0L)
+  var eteCount = MHashMap[(Int, EntityTypesEvent), Long]().withDefaultValue(0L)
+
+  def cutoff(endTime: DateTime) {
+    _endTime = Some(endTime)
+  }
+
+  def update(appId: Int, statusCode: StatusCode, event: Event) {
+    statusCodeCount((appId, statusCode)) += 1
+    eteCount((appId, new EntityTypesEvent(event))) += 1
+  }
+
+  def extractByAppId[K, V](appId: Int, m: mutable.Map[(Int, K), V])
+  : Seq[KV[K, V]] = {
+    m
+    .toSeq
+    .flatMap { case (k, v) =>
+      if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() }
+    }
+  }
+
+  def get(appId: Int): StatsSnapshot = {
+    StatsSnapshot(
+      startTime,
+      _endTime,
+      extractByAppId(appId, eteCount),
+      extractByAppId(appId, statusCodeCount)
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
new file mode 100644
index 0000000..a8ed3e7
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
@@ -0,0 +1,74 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Event
+
+import spray.http.StatusCode
+
+import akka.actor.Actor
+import akka.event.Logging
+
+import com.github.nscala_time.time.Imports.DateTime
+
+/* message to StatsActor */
+case class Bookkeeping(val appId: Int, statusCode: StatusCode, event: Event)
+
+/* message to StatsActor */
+case class GetStats(val appId: Int)
+
+class StatsActor extends Actor {
+  implicit val system = context.system
+  val log = Logging(system, this)
+
+  def getCurrent: DateTime = {
+    DateTime.now.
+      withMinuteOfHour(0).
+      withSecondOfMinute(0).
+      withMillisOfSecond(0)
+  }
+
+  var longLiveStats = new Stats(DateTime.now)
+  var hourlyStats = new Stats(getCurrent)
+
+  var prevHourlyStats = new Stats(getCurrent.minusHours(1))
+  prevHourlyStats.cutoff(hourlyStats.startTime)
+
+  def bookkeeping(appId: Int, statusCode: StatusCode, event: Event) {
+    val current = getCurrent
+    // If the current hour is different from the stats start time, we create
+    // another stats instance, and move the current to prev.
+    if (current != hourlyStats.startTime) {
+      prevHourlyStats = hourlyStats
+      prevHourlyStats.cutoff(current)
+      hourlyStats = new Stats(current)
+    }
+
+    hourlyStats.update(appId, statusCode, event)
+    longLiveStats.update(appId, statusCode, event)
+  }
+
+  def receive: Actor.Receive = {
+    case Bookkeeping(appId, statusCode, event) =>
+      bookkeeping(appId, statusCode, event)
+    case GetStats(appId) => sender() ! Map(
+      "time" -> DateTime.now,
+      "currentHour" -> hourlyStats.get(appId),
+      "prevHour" -> prevHourlyStats.get(appId),
+      "longLive" -> longLiveStats.get(appId))
+    case _ => log.error("Unknown message.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
new file mode 100644
index 0000000..04ff78f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
@@ -0,0 +1,151 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.webhooks.JsonConnector
+import org.apache.predictionio.data.webhooks.FormConnector
+import org.apache.predictionio.data.webhooks.ConnectorUtil
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventJson4sSupport
+import org.apache.predictionio.data.storage.LEvents
+
+import spray.routing._
+import spray.routing.Directives._
+import spray.http.StatusCodes
+import spray.http.StatusCode
+import spray.http.FormData
+import spray.httpx.Json4sSupport
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+
+import akka.event.LoggingAdapter
+import akka.actor.ActorSelection
+
+import scala.concurrent.{ExecutionContext, Future}
+
+
+private[prediction] object Webhooks {
+
+  def postJson(
+    appId: Int,
+    channelId: Option[Int],
+    web: String,
+    data: JObject,
+    eventClient: LEvents,
+    log: LoggingAdapter,
+    stats: Boolean,
+    statsActorRef: ActorSelection
+  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+
+    val eventFuture = Future {
+      WebhooksConnectors.json.get(web).map { connector =>
+        ConnectorUtil.toEvent(connector, data)
+      }
+    }
+
+    eventFuture.flatMap { eventOpt =>
+      if (eventOpt.isEmpty) {
+        Future successful {
+          val message = s"webhooks connection for ${web} is not supported."
+          (StatusCodes.NotFound, Map("message" -> message))
+        }
+      } else {
+        val event = eventOpt.get
+        val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+          val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+
+          if (stats) {
+            statsActorRef ! Bookkeeping(appId, result._1, event)
+          }
+          result
+        }
+        data
+      }
+    }
+  }
+
+  def getJson(
+    appId: Int,
+    channelId: Option[Int],
+    web: String,
+    log: LoggingAdapter
+  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+    Future {
+      WebhooksConnectors.json.get(web).map { connector =>
+        (StatusCodes.OK, Map("message" -> "Ok"))
+      }.getOrElse {
+        val message = s"webhooks connection for ${web} is not supported."
+        (StatusCodes.NotFound, Map("message" -> message))
+      }
+    }
+  }
+
+  def postForm(
+    appId: Int,
+    channelId: Option[Int],
+    web: String,
+    data: FormData,
+    eventClient: LEvents,
+    log: LoggingAdapter,
+    stats: Boolean,
+    statsActorRef: ActorSelection
+  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+    val eventFuture = Future {
+      WebhooksConnectors.form.get(web).map { connector =>
+        ConnectorUtil.toEvent(connector, data.fields.toMap)
+      }
+    }
+
+    eventFuture.flatMap { eventOpt =>
+      if (eventOpt.isEmpty) {
+        Future {
+          val message = s"webhooks connection for ${web} is not supported."
+          (StatusCodes.NotFound, Map("message" -> message))
+        }
+      } else {
+        val event = eventOpt.get
+        val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+          val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+
+          if (stats) {
+            statsActorRef ! Bookkeeping(appId, result._1, event)
+          }
+          result
+        }
+        data
+      }
+    }
+  }
+
+  def getForm(
+    appId: Int,
+    channelId: Option[Int],
+    web: String,
+    log: LoggingAdapter
+  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
+    Future {
+      WebhooksConnectors.form.get(web).map { connector =>
+        (StatusCodes.OK, Map("message" -> "Ok"))
+      }.getOrElse {
+        val message = s"webhooks connection for ${web} is not supported."
+        (StatusCodes.NotFound, Map("message" -> message))
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala b/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala
new file mode 100644
index 0000000..c2578ee
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala
@@ -0,0 +1,34 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.webhooks.JsonConnector
+import org.apache.predictionio.data.webhooks.FormConnector
+
+import org.apache.predictionio.data.webhooks.segmentio.SegmentIOConnector
+import org.apache.predictionio.data.webhooks.mailchimp.MailChimpConnector
+
+private[prediction] object WebhooksConnectors {
+
+  val json: Map[String, JsonConnector] = Map(
+    "segmentio" -> SegmentIOConnector
+  )
+
+  val form: Map[String, FormConnector] = Map(
+    "mailchimp" -> MailChimpConnector
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/package.scala b/data/src/main/scala/org/apache/predictionio/data/package.scala
new file mode 100644
index 0000000..9284787
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/package.scala
@@ -0,0 +1,21 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio
+
+/** Provides data access for PredictionIO and any engines running on top of
+  * PredictionIO
+  */
+package object data {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala
new file mode 100644
index 0000000..3285de9
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala
@@ -0,0 +1,71 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import java.security.SecureRandom
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.commons.codec.binary.Base64
+
+/** :: DeveloperApi ::
+  * Stores mapping of access keys, app IDs, and lists of allowed event names
+  *
+  * @param key Access key
+  * @param appid App ID
+  * @param events List of allowed events for this particular app key
+  * @group Meta Data
+  */
+@DeveloperApi
+case class AccessKey(
+  key: String,
+  appid: Int,
+  events: Seq[String])
+
+/** :: DeveloperApi ::
+  * Base trait of the [[AccessKey]] data access object
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+trait AccessKeys {
+  /** Insert a new [[AccessKey]]. If the key field is empty, a key will be
+    * generated.
+    */
+  def insert(k: AccessKey): Option[String]
+
+  /** Get an [[AccessKey]] by key */
+  def get(k: String): Option[AccessKey]
+
+  /** Get all [[AccessKey]]s */
+  def getAll(): Seq[AccessKey]
+
+  /** Get all [[AccessKey]]s for a particular app ID */
+  def getByAppid(appid: Int): Seq[AccessKey]
+
+  /** Update an [[AccessKey]] */
+  def update(k: AccessKey): Unit
+
+  /** Delete an [[AccessKey]] */
+  def delete(k: String): Unit
+
+  /** Default implementation of key generation */
+  def generateKey: String = {
+    val sr = SecureRandom.getInstanceStrong
+    val srBytes = Array.fill(48)(0.toByte)
+    sr.nextBytes(srBytes)
+    Base64.encodeBase64URLSafeString(srBytes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala
new file mode 100644
index 0000000..b68e1b6
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala
@@ -0,0 +1,58 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+
+/** :: DeveloperApi ::
+  * Stores mapping of app IDs and names
+  *
+  * @param id ID of the app.
+  * @param name Name of the app.
+  * @param description Long description of the app.
+  * @group Meta Data
+  */
+@DeveloperApi
+case class App(
+  id: Int,
+  name: String,
+  description: Option[String])
+
+/** :: DeveloperApi ::
+  * Base trait of the [[App]] data access object
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+trait Apps {
+  /** Insert a new [[App]]. Returns a generated app ID if the supplied app ID is 0. */
+  def insert(app: App): Option[Int]
+
+  /** Get an [[App]] by app ID */
+  def get(id: Int): Option[App]
+
+  /** Get an [[App]] by app name */
+  def getByName(name: String): Option[App]
+
+  /** Get all [[App]]s */
+  def getAll(): Seq[App]
+
+  /** Update an [[App]] */
+  def update(app: App): Unit
+
+  /** Delete an [[App]] */
+  def delete(id: Int): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala
new file mode 100644
index 0000000..ad845b3
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala
@@ -0,0 +1,164 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import scala.collection.immutable.HashMap
+
+import org.apache.spark.rdd.RDD
+
+/** Immutable Bi-directional Map
+  *
+  */
+class BiMap[K, V] private[prediction] (
+  private val m: Map[K, V],
+  private val i: Option[BiMap[V, K]] = None
+  ) extends Serializable {
+
+  // NOTE: make inverse's inverse point back to current BiMap
+  val inverse: BiMap[V, K] = i.getOrElse {
+    val rev = m.map(_.swap)
+    require((rev.size == m.size),
+      s"Failed to create reversed map. Cannot have duplicated values.")
+    new BiMap(rev, Some(this))
+  }
+
+  def get(k: K): Option[V] = m.get(k)
+
+  def getOrElse(k: K, default: => V): V = m.getOrElse(k, default)
+
+  def contains(k: K): Boolean = m.contains(k)
+
+  def apply(k: K): V = m.apply(k)
+
+  /** Converts to a map.
+    * @return a map of type immutable.Map[K, V]
+    */
+  def toMap: Map[K, V] = m
+
+  /** Converts to a sequence.
+    * @return a sequence containing all elements of this map
+    */
+  def toSeq: Seq[(K, V)] = m.toSeq
+
+  def size: Int = m.size
+
+  def take(n: Int): BiMap[K, V] = BiMap(m.take(n))
+
+  override def toString: String = m.toString
+}
+
+object BiMap {
+
+  def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x)
+
+  /** Create a BiMap[String, Long] from a set of String. The Long index starts
+    * from 0.
+    * @param keys a set of String
+    * @return a String to Long BiMap
+    */
+  def stringLong(keys: Set[String]): BiMap[String, Long] = {
+    val hm = HashMap(keys.toSeq.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
+    new BiMap(hm)
+  }
+
+  /** Create a BiMap[String, Long] from an array of String.
+    * NOTE: the the array cannot have duplicated element.
+    * The Long index starts from 0.
+    * @param keys a set of String
+    * @return a String to Long BiMap
+    */
+  def stringLong(keys: Array[String]): BiMap[String, Long] = {
+    val hm = HashMap(keys.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
+    new BiMap(hm)
+  }
+
+  /** Create a BiMap[String, Long] from RDD[String]. The Long index starts
+    * from 0.
+    * @param keys RDD of String
+    * @return a String to Long BiMap
+    */
+  def stringLong(keys: RDD[String]): BiMap[String, Long] = {
+    stringLong(keys.distinct.collect)
+  }
+
+  /** Create a BiMap[String, Int] from a set of String. The Int index starts
+    * from 0.
+    * @param keys a set of String
+    * @return a String to Int BiMap
+    */
+  def stringInt(keys: Set[String]): BiMap[String, Int] = {
+    val hm = HashMap(keys.toSeq.zipWithIndex : _*)
+    new BiMap(hm)
+  }
+
+  /** Create a BiMap[String, Int] from an array of String.
+    * NOTE: the the array cannot have duplicated element.
+    * The Int index starts from 0.
+    * @param keys a set of String
+    * @return a String to Int BiMap
+    */
+  def stringInt(keys: Array[String]): BiMap[String, Int] = {
+    val hm = HashMap(keys.zipWithIndex : _*)
+    new BiMap(hm)
+  }
+
+  /** Create a BiMap[String, Int] from RDD[String]. The Int index starts
+    * from 0.
+    * @param keys RDD of String
+    * @return a String to Int BiMap
+    */
+  def stringInt(keys: RDD[String]): BiMap[String, Int] = {
+    stringInt(keys.distinct.collect)
+  }
+
+  private[this] def stringDoubleImpl(keys: Seq[String])
+  : BiMap[String, Double] = {
+    val ki = keys.zipWithIndex.map(e => (e._1, e._2.toDouble))
+    new BiMap(HashMap(ki : _*))
+  }
+
+  /** Create a BiMap[String, Double] from a set of String. The Double index
+    * starts from 0.
+    * @param keys a set of String
+    * @return a String to Double BiMap
+    */
+  def stringDouble(keys: Set[String]): BiMap[String, Double] = {
+    // val hm = HashMap(keys.toSeq.zipWithIndex.map(_.toDouble) : _*)
+    // new BiMap(hm)
+    stringDoubleImpl(keys.toSeq)
+  }
+
+  /** Create a BiMap[String, Double] from an array of String.
+    * NOTE: the the array cannot have duplicated element.
+    * The Double index starts from 0.
+    * @param keys a set of String
+    * @return a String to Double BiMap
+    */
+  def stringDouble(keys: Array[String]): BiMap[String, Double] = {
+    // val hm = HashMap(keys.zipWithIndex.mapValues(_.toDouble) : _*)
+    // new BiMap(hm)
+    stringDoubleImpl(keys.toSeq)
+  }
+
+  /** Create a BiMap[String, Double] from RDD[String]. The Double index starts
+    * from 0.
+    * @param keys RDD of String
+    * @return a String to Double BiMap
+    */
+  def stringDouble(keys: RDD[String]): BiMap[String, Double] = {
+    stringDoubleImpl(keys.distinct.collect)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala
new file mode 100644
index 0000000..e602e1e
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala
@@ -0,0 +1,79 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+
+/** :: DeveloperApi ::
+  * Stores mapping of channel IDs, names and app ID
+  *
+  * @param id ID of the channel
+  * @param name Name of the channel (must be unique within the same app)
+  * @param appid ID of the app which this channel belongs to
+  * @group Meta Data
+  */
+@DeveloperApi
+case class Channel(
+  id: Int,
+  name: String, // must be unique within the same app
+  appid: Int
+) {
+  require(Channel.isValidName(name),
+    "Invalid channel name: ${name}. ${Channel.nameConstraint}")
+}
+
+/** :: DeveloperApi ::
+  * Companion object of [[Channel]]
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+object Channel {
+  /** Examine whether the supplied channel name is valid. A valid channel name
+    * must consists of 1 to 16 alphanumeric and '-' characters.
+    *
+    * @param s Channel name to examine
+    * @return true if channel name is valid, false otherwise
+    */
+  def isValidName(s: String): Boolean = {
+    // note: update channelNameConstraint if this rule is changed
+    s.matches("^[a-zA-Z0-9-]{1,16}$")
+  }
+
+  /** For consistent error message display */
+  val nameConstraint: String =
+    "Only alphanumeric and - characters are allowed and max length is 16."
+}
+
+/** :: DeveloperApi ::
+  * Base trait of the [[Channel]] data access object
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+trait Channels {
+  /** Insert a new [[Channel]]. Returns a generated channel ID if original ID is 0. */
+  def insert(channel: Channel): Option[Int]
+
+  /** Get a [[Channel]] by channel ID */
+  def get(id: Int): Option[Channel]
+
+  /** Get all [[Channel]] by app ID */
+  def getByAppid(appid: Int): Seq[Channel]
+
+  /** Delete a [[Channel]] */
+  def delete(id: Int): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala
new file mode 100644
index 0000000..93b6f51
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala
@@ -0,0 +1,241 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.json4s._
+import org.json4s.native.JsonMethods.parse
+
+import scala.collection.GenTraversableOnce
+import scala.collection.JavaConversions
+
+/** Exception class for [[DataMap]]
+  *
+  * @group Event Data
+  */
+case class DataMapException(msg: String, cause: Exception)
+  extends Exception(msg, cause) {
+  def this(msg: String) = this(msg, null)
+}
+
+/** A DataMap stores properties of the event or entity. Internally it is a Map
+  * whose keys are property names and values are corresponding JSON values
+  * respectively. Use the [[get]] method to retrieve the value of a mandatory
+  * property or use [[getOpt]] to retrieve the value of an optional property.
+  *
+  * @param fields Map of property name to JValue
+  * @group Event Data
+  */
+class DataMap (
+  val fields: Map[String, JValue]
+) extends Serializable {
+  @transient lazy implicit private val formats = DefaultFormats +
+    new DateTimeJson4sSupport.Serializer
+
+  /** Check the existence of a required property name. Throw an exception if
+    * it does not exist.
+    *
+    * @param name The property name
+    */
+  def require(name: String): Unit = {
+    if (!fields.contains(name)) {
+      throw new DataMapException(s"The field $name is required.")
+    }
+  }
+
+  /** Check if this DataMap contains a specific property.
+    *
+    * @param name The property name
+    * @return Return true if the property exists, else false.
+    */
+  def contains(name: String): Boolean = {
+    fields.contains(name)
+  }
+
+  /** Get the value of a mandatory property. Exception is thrown if the property
+    * does not exist.
+    *
+    * @tparam T The type of the property value
+    * @param name The property name
+    * @return Return the property value of type T
+    */
+  def get[T: Manifest](name: String): T = {
+    require(name)
+    fields(name) match {
+      case JNull => throw new DataMapException(
+        s"The required field $name cannot be null.")
+      case x: JValue => x.extract[T]
+    }
+  }
+
+  /** Get the value of an optional property. Return None if the property does
+    * not exist.
+    *
+    * @tparam T The type of the property value
+    * @param name The property name
+    * @return Return the property value of type Option[T]
+    */
+  def getOpt[T: Manifest](name: String): Option[T] = {
+    // either the field doesn't exist or its value is null
+    fields.get(name).flatMap(_.extract[Option[T]])
+  }
+
+  /** Get the value of an optional property. Return default value if the
+    * property does not exist.
+    *
+    * @tparam T The type of the property value
+    * @param name The property name
+    * @param default The default property value of type T
+    * @return Return the property value of type T
+    */
+  def getOrElse[T: Manifest](name: String, default: T): T = {
+    getOpt[T](name).getOrElse(default)
+  }
+
+  /** Java-friendly method for getting the value of a property. Return null if the
+    * property does not exist.
+    *
+    * @tparam T The type of the property value
+    * @param name The property name
+    * @param clazz The class of the type of the property value
+    * @return Return the property value of type T
+    */
+  def get[T](name: String, clazz: java.lang.Class[T]): T = {
+    val manifest =  new Manifest[T] {
+      override def erasure: Class[_] = clazz
+      override def runtimeClass: Class[_] = clazz
+    }
+
+    fields.get(name) match {
+      case None => null.asInstanceOf[T]
+      case Some(JNull) => null.asInstanceOf[T]
+      case Some(x) => x.extract[T](formats, manifest)
+    }
+  }
+
+  /** Java-friendly method for getting a list of values of a property. Return null if the
+    * property does not exist.
+    *
+    * @param name The property name
+    * @return Return the list of property values
+    */
+  def getStringList(name: String): java.util.List[String] = {
+    fields.get(name) match {
+      case None => null
+      case Some(JNull) => null
+      case Some(x) =>
+        JavaConversions.seqAsJavaList(x.extract[List[String]](formats, manifest[List[String]]))
+    }
+  }
+
+  /** Return a new DataMap with elements containing elements from the left hand
+    * side operand followed by elements from the right hand side operand.
+    *
+    * @param that Right hand side DataMap
+    * @return A new DataMap
+    */
+  def ++ (that: DataMap): DataMap = DataMap(this.fields ++ that.fields)
+
+  /** Creates a new DataMap from this DataMap by removing all elements of
+    * another collection.
+    *
+    * @param that A collection containing the removed property names
+    * @return A new DataMap
+    */
+  def -- (that: GenTraversableOnce[String]): DataMap =
+    DataMap(this.fields -- that)
+
+  /** Tests whether the DataMap is empty.
+    *
+    * @return true if the DataMap is empty, false otherwise.
+    */
+  def isEmpty: Boolean = fields.isEmpty
+
+  /** Collects all property names of this DataMap in a set.
+    *
+    * @return a set containing all property names of this DataMap.
+    */
+  def keySet: Set[String] = this.fields.keySet
+
+  /** Converts this DataMap to a List.
+    *
+    * @return a list of (property name, JSON value) tuples.
+    */
+  def toList(): List[(String, JValue)] = fields.toList
+
+  /** Converts this DataMap to a JObject.
+    *
+    * @return the JObject initialized by this DataMap.
+    */
+  def toJObject(): JObject = JObject(toList())
+
+  /** Converts this DataMap to case class of type T.
+    *
+    * @return the object of type T.
+    */
+  def extract[T: Manifest]: T = {
+    toJObject().extract[T]
+  }
+
+  override
+  def toString: String = s"DataMap($fields)"
+
+  override
+  def hashCode: Int = 41 + fields.hashCode
+
+  override
+  def equals(other: Any): Boolean = other match {
+    case that: DataMap => that.canEqual(this) && this.fields.equals(that.fields)
+    case _ => false
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[DataMap]
+}
+
+/** Companion object of the [[DataMap]] class
+  *
+  * @group Event Data
+  */
+object DataMap {
+  /** Create an empty DataMap
+    * @return an empty DataMap
+    */
+  def apply(): DataMap = new DataMap(Map[String, JValue]())
+
+  /** Create an DataMap from a Map of String to JValue
+    * @param fields a Map of String to JValue
+    * @return a new DataMap initialized by fields
+    */
+  def apply(fields: Map[String, JValue]): DataMap = new DataMap(fields)
+
+  /** Create an DataMap from a JObject
+    * @param jObj JObject
+    * @return a new DataMap initialized by a JObject
+    */
+  def apply(jObj: JObject): DataMap = {
+    if (jObj == null) {
+      apply()
+    } else {
+      new DataMap(jObj.obj.toMap)
+    }
+  }
+
+  /** Create an DataMap from a JSON String
+    * @param js JSON String. eg """{ "a": 1, "b": "foo" }"""
+    * @return a new DataMap initialized by a JSON string
+    */
+  def apply(js: String): DataMap = apply(parse(js).asInstanceOf[JObject])
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala
new file mode 100644
index 0000000..b3789a4
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala
@@ -0,0 +1,47 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.data.{Utils => DataUtils}
+import org.joda.time.DateTime
+import org.json4s._
+
+/** :: DeveloperApi ::
+  * JSON4S serializer for Joda-Time
+  *
+  * @group Common
+  */
+@DeveloperApi
+object DateTimeJson4sSupport {
+
+  @transient lazy implicit val formats = DefaultFormats
+
+  /** Serialize DateTime to JValue */
+  def serializeToJValue: PartialFunction[Any, JValue] = {
+    case d: DateTime => JString(DataUtils.dateTimeToString(d))
+  }
+
+  /** Deserialize JValue to DateTime */
+  def deserializeFromJValue: PartialFunction[JValue, DateTime] = {
+    case jv: JValue => DataUtils.stringToDateTime(jv.extract[String])
+  }
+
+  /** Custom JSON4S serializer for Joda-Time */
+  class Serializer extends CustomSerializer[DateTime](format => (
+    deserializeFromJValue, serializeToJValue))
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
new file mode 100644
index 0000000..bc71f3f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
@@ -0,0 +1,177 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+  * Stores parameters, model, and other information for each engine instance
+  *
+  * @param id Engine instance ID.
+  * @param status Status of the engine instance.
+  * @param startTime Start time of the training/evaluation.
+  * @param endTime End time of the training/evaluation.
+  * @param engineId Engine ID of the instance.
+  * @param engineVersion Engine version of the instance.
+  * @param engineVariant Engine variant ID of the instance.
+  * @param engineFactory Engine factory class for the instance.
+  * @param batch A batch label of the engine instance.
+  * @param env The environment in which the instance was created.
+  * @param sparkConf Custom Spark configuration of the instance.
+  * @param dataSourceParams Data source parameters of the instance.
+  * @param preparatorParams Preparator parameters of the instance.
+  * @param algorithmsParams Algorithms parameters of the instance.
+  * @param servingParams Serving parameters of the instance.
+  * @group Meta Data
+  */
+@DeveloperApi
+case class EngineInstance(
+  id: String,
+  status: String,
+  startTime: DateTime,
+  endTime: DateTime,
+  engineId: String,
+  engineVersion: String,
+  engineVariant: String,
+  engineFactory: String,
+  batch: String,
+  env: Map[String, String],
+  sparkConf: Map[String, String],
+  dataSourceParams: String,
+  preparatorParams: String,
+  algorithmsParams: String,
+  servingParams: String)
+
+/** :: DeveloperApi ::
+  * Base trait of the [[EngineInstance]] data access object
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+trait EngineInstances {
+  /** Insert a new [[EngineInstance]] */
+  def insert(i: EngineInstance): String
+
+  /** Get an [[EngineInstance]] by ID */
+  def get(id: String): Option[EngineInstance]
+
+  /** Get all [[EngineInstance]]s */
+  def getAll(): Seq[EngineInstance]
+
+  /** Get an instance that has started training the latest and has trained to
+    * completion
+    */
+  def getLatestCompleted(
+      engineId: String,
+      engineVersion: String,
+      engineVariant: String): Option[EngineInstance]
+
+  /** Get all instances that has trained to completion */
+  def getCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Seq[EngineInstance]
+
+  /** Update an [[EngineInstance]] */
+  def update(i: EngineInstance): Unit
+
+  /** Delete an [[EngineInstance]] */
+  def delete(id: String): Unit
+}
+
+/** :: DeveloperApi ::
+  * JSON4S serializer for [[EngineInstance]]
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+class EngineInstanceSerializer
+    extends CustomSerializer[EngineInstance](
+  format => ({
+    case JObject(fields) =>
+      implicit val formats = DefaultFormats
+      val seed = EngineInstance(
+          id = "",
+          status = "",
+          startTime = DateTime.now,
+          endTime = DateTime.now,
+          engineId = "",
+          engineVersion = "",
+          engineVariant = "",
+          engineFactory = "",
+          batch = "",
+          env = Map(),
+          sparkConf = Map(),
+          dataSourceParams = "",
+          preparatorParams = "",
+          algorithmsParams = "",
+          servingParams = "")
+      fields.foldLeft(seed) { case (i, field) =>
+        field match {
+          case JField("id", JString(id)) => i.copy(id = id)
+          case JField("status", JString(status)) => i.copy(status = status)
+          case JField("startTime", JString(startTime)) =>
+            i.copy(startTime = Utils.stringToDateTime(startTime))
+          case JField("endTime", JString(endTime)) =>
+            i.copy(endTime = Utils.stringToDateTime(endTime))
+          case JField("engineId", JString(engineId)) =>
+            i.copy(engineId = engineId)
+          case JField("engineVersion", JString(engineVersion)) =>
+            i.copy(engineVersion = engineVersion)
+          case JField("engineVariant", JString(engineVariant)) =>
+            i.copy(engineVariant = engineVariant)
+          case JField("engineFactory", JString(engineFactory)) =>
+            i.copy(engineFactory = engineFactory)
+          case JField("batch", JString(batch)) => i.copy(batch = batch)
+          case JField("env", env) =>
+            i.copy(env = Extraction.extract[Map[String, String]](env))
+          case JField("sparkConf", sparkConf) =>
+            i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf))
+          case JField("dataSourceParams", JString(dataSourceParams)) =>
+            i.copy(dataSourceParams = dataSourceParams)
+          case JField("preparatorParams", JString(preparatorParams)) =>
+            i.copy(preparatorParams = preparatorParams)
+          case JField("algorithmsParams", JString(algorithmsParams)) =>
+            i.copy(algorithmsParams = algorithmsParams)
+          case JField("servingParams", JString(servingParams)) =>
+            i.copy(servingParams = servingParams)
+          case _ => i
+        }
+      }
+  },
+  {
+    case i: EngineInstance =>
+      JObject(
+        JField("id", JString(i.id)) ::
+        JField("status", JString(i.status)) ::
+        JField("startTime", JString(i.startTime.toString)) ::
+        JField("endTime", JString(i.endTime.toString)) ::
+        JField("engineId", JString(i.engineId)) ::
+        JField("engineVersion", JString(i.engineVersion)) ::
+        JField("engineVariant", JString(i.engineVariant)) ::
+        JField("engineFactory", JString(i.engineFactory)) ::
+        JField("batch", JString(i.batch)) ::
+        JField("env", Extraction.decompose(i.env)(DefaultFormats)) ::
+        JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) ::
+        JField("dataSourceParams", JString(i.dataSourceParams)) ::
+        JField("preparatorParams", JString(i.preparatorParams)) ::
+        JField("algorithmsParams", JString(i.algorithmsParams)) ::
+        JField("servingParams", JString(i.servingParams)) ::
+        Nil)
+  }
+))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
new file mode 100644
index 0000000..372a2e7
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
@@ -0,0 +1,117 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+  * Provides a way to discover engines by ID and version in a distributed
+  * environment
+  *
+  * @param id Unique identifier of an engine.
+  * @param version Engine version string.
+  * @param name A short and descriptive name for the engine.
+  * @param description A long description of the engine.
+  * @param files Paths to engine files.
+  * @param engineFactory Engine's factory class name.
+  * @group Meta Data
+  */
+@DeveloperApi
+case class EngineManifest(
+  id: String,
+  version: String,
+  name: String,
+  description: Option[String],
+  files: Seq[String],
+  engineFactory: String)
+
+/** :: DeveloperApi ::
+  * Base trait of the [[EngineManifest]] data access object
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+trait EngineManifests {
+  /** Inserts an [[EngineManifest]] */
+  def insert(engineManifest: EngineManifest): Unit
+
+  /** Get an [[EngineManifest]] by its ID */
+  def get(id: String, version: String): Option[EngineManifest]
+
+  /** Get all [[EngineManifest]] */
+  def getAll(): Seq[EngineManifest]
+
+  /** Updates an [[EngineManifest]] */
+  def update(engineInfo: EngineManifest, upsert: Boolean = false): Unit
+
+  /** Delete an [[EngineManifest]] by its ID */
+  def delete(id: String, version: String): Unit
+}
+
+/** :: DeveloperApi ::
+  * JSON4S serializer for [[EngineManifest]]
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+class EngineManifestSerializer
+    extends CustomSerializer[EngineManifest](format => (
+  {
+    case JObject(fields) =>
+      val seed = EngineManifest(
+        id = "",
+        version = "",
+        name = "",
+        description = None,
+        files = Nil,
+        engineFactory = "")
+      fields.foldLeft(seed) { case (enginemanifest, field) =>
+        field match {
+          case JField("id", JString(id)) => enginemanifest.copy(id = id)
+          case JField("version", JString(version)) =>
+            enginemanifest.copy(version = version)
+          case JField("name", JString(name)) => enginemanifest.copy(name = name)
+          case JField("description", JString(description)) =>
+            enginemanifest.copy(description = Some(description))
+          case JField("files", JArray(s)) =>
+            enginemanifest.copy(files = s.map(t =>
+              t match {
+                case JString(file) => file
+                case _ => ""
+              }
+            ))
+          case JField("engineFactory", JString(engineFactory)) =>
+            enginemanifest.copy(engineFactory = engineFactory)
+          case _ => enginemanifest
+        }
+      }
+  },
+  {
+    case enginemanifest: EngineManifest =>
+      JObject(
+        JField("id", JString(enginemanifest.id)) ::
+        JField("version", JString(enginemanifest.version)) ::
+        JField("name", JString(enginemanifest.name)) ::
+        JField("description",
+          enginemanifest.description.map(
+            x => JString(x)).getOrElse(JNothing)) ::
+        JField("files",
+          JArray(enginemanifest.files.map(x => JString(x)).toList)) ::
+        JField("engineFactory", JString(enginemanifest.engineFactory)) ::
+        Nil)
+  }
+))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala
new file mode 100644
index 0000000..aa7224c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala
@@ -0,0 +1,98 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.Experimental
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+/**
+ * :: Experimental ::
+ */
+@Experimental
+class EntityIdIxMap(val idToIx: BiMap[String, Long]) extends Serializable {
+
+  val ixToId: BiMap[Long, String] = idToIx.inverse
+
+  def apply(id: String): Long = idToIx(id)
+
+  def apply(ix: Long): String = ixToId(ix)
+
+  def contains(id: String): Boolean = idToIx.contains(id)
+
+  def contains(ix: Long): Boolean = ixToId.contains(ix)
+
+  def get(id: String): Option[Long] = idToIx.get(id)
+
+  def get(ix: Long): Option[String] = ixToId.get(ix)
+
+  def getOrElse(id: String, default: => Long): Long =
+    idToIx.getOrElse(id, default)
+
+  def getOrElse(ix: Long, default: => String): String =
+    ixToId.getOrElse(ix, default)
+
+  def toMap: Map[String, Long] = idToIx.toMap
+
+  def size: Long = idToIx.size
+
+  def take(n: Int): EntityIdIxMap = new EntityIdIxMap(idToIx.take(n))
+
+  override def toString: String = idToIx.toString
+}
+
+/** :: Experimental :: */
+@Experimental
+object EntityIdIxMap {
+  def apply(keys: RDD[String]): EntityIdIxMap = {
+    new EntityIdIxMap(BiMap.stringLong(keys))
+  }
+}
+
+/** :: Experimental :: */
+@Experimental
+class EntityMap[A](val idToData: Map[String, A],
+  override val idToIx: BiMap[String, Long]) extends EntityIdIxMap(idToIx) {
+
+  def this(idToData: Map[String, A]) = this(
+    idToData,
+    BiMap.stringLong(idToData.keySet)
+  )
+
+  def data(id: String): A = idToData(id)
+
+  def data(ix: Long): A = idToData(ixToId(ix))
+
+  def getData(id: String): Option[A] = idToData.get(id)
+
+  def getData(ix: Long): Option[A] = idToData.get(ixToId(ix))
+
+  def getOrElseData(id: String, default: => A): A =
+    getData(id).getOrElse(default)
+
+  def getOrElseData(ix: Long, default: => A): A =
+    getData(ix).getOrElse(default)
+
+  override def take(n: Int): EntityMap[A] = {
+    val newIdToIx = idToIx.take(n)
+    new EntityMap[A](idToData.filterKeys(newIdToIx.contains(_)), newIdToIx)
+  }
+
+  override def toString: String = {
+    s"idToData: ${idToData.toString} " + s"idToix: ${idToIx.toString}"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
new file mode 100644
index 0000000..a58e642
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
@@ -0,0 +1,135 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+  * Stores meta information for each evaluation instance.
+  *
+  * @param id Instance ID.
+  * @param status Status of this instance.
+  * @param startTime Start time of this instance.
+  * @param endTime End time of this instance.
+  * @param evaluationClass Evaluation class name of this instance.
+  * @param engineParamsGeneratorClass Engine parameters generator class name of this instance.
+  * @param batch Batch label of this instance.
+  * @param env The environment in which this instance was created.
+  * @param evaluatorResults Results of the evaluator.
+  * @param evaluatorResultsHTML HTML results of the evaluator.
+  * @param evaluatorResultsJSON JSON results of the evaluator.
+  * @group Meta Data
+  */
+@DeveloperApi
+case class EvaluationInstance(
+  id: String = "",
+  status: String = "",
+  startTime: DateTime = DateTime.now,
+  endTime: DateTime = DateTime.now,
+  evaluationClass: String = "",
+  engineParamsGeneratorClass: String = "",
+  batch: String = "",
+  env: Map[String, String] = Map(),
+  sparkConf: Map[String, String] = Map(),
+  evaluatorResults: String = "",
+  evaluatorResultsHTML: String = "",
+  evaluatorResultsJSON: String = "")
+
+/** :: DeveloperApi ::
+  * Base trait of the [[EvaluationInstance]] data access object
+  *
+  * @group Meta Data
+  */
+@DeveloperApi
+trait EvaluationInstances {
+  /** Insert a new [[EvaluationInstance]] */
+  def insert(i: EvaluationInstance): String
+
+  /** Get an [[EvaluationInstance]] by ID */
+  def get(id: String): Option[EvaluationInstance]
+
+  /** Get all [[EvaluationInstances]] */
+  def getAll: Seq[EvaluationInstance]
+
+  /** Get instances that are produced by evaluation and have run to completion,
+    * reverse sorted by the start time
+    */
+  def getCompleted: Seq[EvaluationInstance]
+
+  /** Update an [[EvaluationInstance]] */
+  def update(i: EvaluationInstance): Unit
+
+  /** Delete an [[EvaluationInstance]] */
+  def delete(id: String): Unit
+}
+
+/** :: DeveloperApi ::
+  * JSON4S serializer for [[EvaluationInstance]]
+  *
+  * @group Meta Data
+  */
+class EvaluationInstanceSerializer extends CustomSerializer[EvaluationInstance](
+  format => ({
+    case JObject(fields) =>
+      implicit val formats = DefaultFormats
+      fields.foldLeft(EvaluationInstance()) { case (i, field) =>
+        field match {
+          case JField("id", JString(id)) => i.copy(id = id)
+          case JField("status", JString(status)) => i.copy(status = status)
+          case JField("startTime", JString(startTime)) =>
+            i.copy(startTime = Utils.stringToDateTime(startTime))
+          case JField("endTime", JString(endTime)) =>
+            i.copy(endTime = Utils.stringToDateTime(endTime))
+          case JField("evaluationClass", JString(evaluationClass)) =>
+            i.copy(evaluationClass = evaluationClass)
+          case JField("engineParamsGeneratorClass", JString(engineParamsGeneratorClass)) =>
+            i.copy(engineParamsGeneratorClass = engineParamsGeneratorClass)
+          case JField("batch", JString(batch)) => i.copy(batch = batch)
+          case JField("env", env) =>
+            i.copy(env = Extraction.extract[Map[String, String]](env))
+          case JField("sparkConf", sparkConf) =>
+            i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf))
+          case JField("evaluatorResults", JString(evaluatorResults)) =>
+            i.copy(evaluatorResults = evaluatorResults)
+          case JField("evaluatorResultsHTML", JString(evaluatorResultsHTML)) =>
+            i.copy(evaluatorResultsHTML = evaluatorResultsHTML)
+          case JField("evaluatorResultsJSON", JString(evaluatorResultsJSON)) =>
+            i.copy(evaluatorResultsJSON = evaluatorResultsJSON)
+          case _ => i
+        }
+      }
+  }, {
+    case i: EvaluationInstance =>
+      JObject(
+        JField("id", JString(i.id)) ::
+          JField("status", JString(i.status)) ::
+          JField("startTime", JString(i.startTime.toString)) ::
+          JField("endTime", JString(i.endTime.toString)) ::
+          JField("evaluationClass", JString(i.evaluationClass)) ::
+          JField("engineParamsGeneratorClass", JString(i.engineParamsGeneratorClass)) ::
+          JField("batch", JString(i.batch)) ::
+          JField("env", Extraction.decompose(i.env)(DefaultFormats)) ::
+          JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) ::
+          JField("evaluatorResults", JString(i.evaluatorResults)) ::
+          JField("evaluatorResultsHTML", JString(i.evaluatorResultsHTML)) ::
+          JField("evaluatorResultsJSON", JString(i.evaluatorResultsJSON)) ::
+          Nil
+      )
+  }
+  )
+)