You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:42 UTC

[11/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala
new file mode 100644
index 0000000..6169a02
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala
@@ -0,0 +1,164 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+
+/** Each event in the Event Store can be represented by fields in this case
+  * class.
+  *
+  * @param eventId Unique ID of this event.
+  * @param event Name of this event.
+  * @param entityType Type of the entity associated with this event.
+  * @param entityId ID of the entity associated with this event.
+  * @param targetEntityType Type of the target entity associated with this
+  *                         event.
+  * @param targetEntityId ID of the target entity associated with this event.
+  * @param properties Properties associated with this event.
+  * @param eventTime Time of the happening of this event.
+  * @param tags Tags of this event.
+  * @param prId PredictedResultId of this event.
+  * @param creationTime Time of creation in the system of this event.
+  * @group Event Data
+  */
+case class Event(
+  val eventId: Option[String] = None,
+  val event: String,
+  val entityType: String,
+  val entityId: String,
+  val targetEntityType: Option[String] = None,
+  val targetEntityId: Option[String] = None,
+  val properties: DataMap = DataMap(), // default empty
+  val eventTime: DateTime = DateTime.now,
+  val tags: Seq[String] = Nil,
+  val prId: Option[String] = None,
+  val creationTime: DateTime = DateTime.now
+) {
+  override def toString(): String = {
+    s"Event(id=$eventId,event=$event,eType=$entityType,eId=$entityId," +
+    s"tType=$targetEntityType,tId=$targetEntityId,p=$properties,t=$eventTime," +
+    s"tags=$tags,pKey=$prId,ct=$creationTime)"
+  }
+}
+
+/** :: DeveloperApi ::
+  * Utilities for validating [[Event]]s
+  *
+  * @group Event Data
+  */
+@DeveloperApi
+object EventValidation {
+  /** Default time zone is set to UTC */
+  val defaultTimeZone = DateTimeZone.UTC
+
+  /** Checks whether an event name contains a reserved prefix
+    *
+    * @param name Event name
+    * @return true if event name starts with \$ or pio_, false otherwise
+    */
+  def isReservedPrefix(name: String): Boolean = name.startsWith("$") ||
+    name.startsWith("pio_")
+
+  /** PredictionIO reserves some single entity event names. They are currently
+    * \$set, \$unset, and \$delete.
+    */
+  val specialEvents = Set("$set", "$unset", "$delete")
+
+  /** Checks whether an event name is a special PredictionIO event name
+    *
+    * @param name Event name
+    * @return true if the name is a special event, false otherwise
+    */
+  def isSpecialEvents(name: String): Boolean = specialEvents.contains(name)
+
+  /** Validate an [[Event]], throwing exceptions when the candidate violates any
+    * of the following:
+    *
+    *  - event name must not be empty
+    *  - entityType must not be empty
+    *  - entityId must not be empty
+    *  - targetEntityType must not be Some of empty
+    *  - targetEntityId must not be Some of empty
+    *  - targetEntityType and targetEntityId must be both Some or None
+    *  - properties must not be empty when event is \$unset
+    *  - event name must be a special event if it has a reserved prefix
+    *  - targetEntityType and targetEntityId must be None if the event name has
+    *    a reserved prefix
+    *  - entityType must be a built-in entity type if entityType has a
+    *    reserved prefix
+    *  - targetEntityType must be a built-in entity type if targetEntityType is
+    *    Some and has a reserved prefix
+    *
+    * @param e Event to be validated
+    */
+  def validate(e: Event): Unit = {
+
+    require(!e.event.isEmpty, "event must not be empty.")
+    require(!e.entityType.isEmpty, "entityType must not be empty string.")
+    require(!e.entityId.isEmpty, "entityId must not be empty string.")
+    require(e.targetEntityType.map(!_.isEmpty).getOrElse(true),
+      "targetEntityType must not be empty string")
+    require(e.targetEntityId.map(!_.isEmpty).getOrElse(true),
+      "targetEntityId must not be empty string.")
+    require(!((e.targetEntityType != None) && (e.targetEntityId == None)),
+      "targetEntityType and targetEntityId must be specified together.")
+    require(!((e.targetEntityType == None) && (e.targetEntityId != None)),
+      "targetEntityType and targetEntityId must be specified together.")
+    require(!((e.event == "$unset") && e.properties.isEmpty),
+      "properties cannot be empty for $unset event")
+    require(!isReservedPrefix(e.event) || isSpecialEvents(e.event),
+      s"${e.event} is not a supported reserved event name.")
+    require(!isSpecialEvents(e.event) ||
+      ((e.targetEntityType == None) && (e.targetEntityId == None)),
+      s"Reserved event ${e.event} cannot have targetEntity")
+    require(!isReservedPrefix(e.entityType) ||
+      isBuiltinEntityTypes(e.entityType),
+      s"The entityType ${e.entityType} is not allowed. " +
+        s"'pio_' is a reserved name prefix.")
+    require(e.targetEntityType.map{ t =>
+      (!isReservedPrefix(t) || isBuiltinEntityTypes(t))}.getOrElse(true),
+      s"The targetEntityType ${e.targetEntityType.get} is not allowed. " +
+        s"'pio_' is a reserved name prefix.")
+    validateProperties(e)
+  }
+
+  /** Defines built-in entity types. The current built-in type is pio_pr. */
+  val builtinEntityTypes: Set[String] = Set("pio_pr")
+
+  /** Defines built-in properties. This is currently empty. */
+  val builtinProperties: Set[String] = Set()
+
+  /** Checks whether an entity type is a built-in entity type */
+  def isBuiltinEntityTypes(name: String): Boolean = builtinEntityTypes.contains(name)
+
+  /** Validate event properties, throwing exceptions when the candidate violates
+    * any of the following:
+    *
+    *  - property name must not contain a reserved prefix
+    *
+    * @param e Event to be validated
+    */
+  def validateProperties(e: Event): Unit = {
+    e.properties.keySet.foreach { k =>
+      require(!isReservedPrefix(k) || builtinProperties.contains(k),
+        s"The property ${k} is not allowed. " +
+          s"'pio_' is a reserved name prefix.")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
new file mode 100644
index 0000000..7d4fce3
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
@@ -0,0 +1,236 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.data.{Utils => DataUtils}
+import org.joda.time.DateTime
+import org.json4s._
+import scala.util.{Try, Success, Failure}
+
+/** :: DeveloperApi ::
+  * Support library for dealing with [[Event]] and JSON4S
+  *
+  * @group Event Data
+  */
+@DeveloperApi
+object EventJson4sSupport {
+  /** This is set to org.json4s.DefaultFormats. Do not use JSON4S to serialize
+    * or deserialize Joda-Time DateTime because it has some issues with timezone
+    * (as of version 3.2.10)
+    */
+  implicit val formats = DefaultFormats
+
+  /** :: DeveloperApi ::
+    * Convert JSON from Event Server to [[Event]]
+    *
+    * @return deserialization routine used by [[APISerializer]]
+    */
+  @DeveloperApi
+  def readJson: PartialFunction[JValue, Event] = {
+    case JObject(x) => {
+      val fields = new DataMap(x.toMap)
+      // use get() if required in json
+      // use getOpt() if not required in json
+      try {
+        val event = fields.get[String]("event")
+        val entityType = fields.get[String]("entityType")
+        val entityId = fields.get[String]("entityId")
+        val targetEntityType = fields.getOpt[String]("targetEntityType")
+        val targetEntityId = fields.getOpt[String]("targetEntityId")
+        val properties = fields.getOrElse[Map[String, JValue]](
+          "properties", Map())
+        // default currentTime expressed as UTC timezone
+        lazy val currentTime = DateTime.now(EventValidation.defaultTimeZone)
+        val eventTime = fields.getOpt[String]("eventTime")
+          .map{ s =>
+            try {
+              DataUtils.stringToDateTime(s)
+            } catch {
+              case _: Exception =>
+                throw new MappingException(s"Fail to extract eventTime ${s}")
+            }
+          }.getOrElse(currentTime)
+
+        // disable tags from API for now.
+        val tags = List()
+      // val tags = fields.getOpt[Seq[String]]("tags").getOrElse(List())
+
+        val prId = fields.getOpt[String]("prId")
+
+        // don't allow user set creationTime from API for now.
+        val creationTime = currentTime
+      // val creationTime = fields.getOpt[String]("creationTime")
+      //   .map{ s =>
+      //     try {
+      //       DataUtils.stringToDateTime(s)
+      //     } catch {
+      //       case _: Exception =>
+      //         throw new MappingException(s"Fail to extract creationTime ${s}")
+      //     }
+      //   }.getOrElse(currentTime)
+
+
+        val newEvent = Event(
+          event = event,
+          entityType = entityType,
+          entityId = entityId,
+          targetEntityType = targetEntityType,
+          targetEntityId = targetEntityId,
+          properties = DataMap(properties),
+          eventTime = eventTime,
+          prId = prId,
+          creationTime = creationTime
+        )
+        EventValidation.validate(newEvent)
+        newEvent
+      } catch {
+        case e: Exception => throw new MappingException(e.toString, e)
+      }
+    }
+  }
+
+  /** :: DeveloperApi ::
+    * Convert [[Event]] to JSON for use by the Event Server
+    *
+    * @return serialization routine used by [[APISerializer]]
+    */
+  @DeveloperApi
+  def writeJson: PartialFunction[Any, JValue] = {
+    case d: Event => {
+      JObject(
+        JField("eventId",
+          d.eventId.map( eid => JString(eid)).getOrElse(JNothing)) ::
+        JField("event", JString(d.event)) ::
+        JField("entityType", JString(d.entityType)) ::
+        JField("entityId", JString(d.entityId)) ::
+        JField("targetEntityType",
+          d.targetEntityType.map(JString(_)).getOrElse(JNothing)) ::
+        JField("targetEntityId",
+          d.targetEntityId.map(JString(_)).getOrElse(JNothing)) ::
+        JField("properties", d.properties.toJObject) ::
+        JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) ::
+        // disable tags from API for now
+        // JField("tags", JArray(d.tags.toList.map(JString(_)))) ::
+        // disable tags from API for now
+        JField("prId",
+          d.prId.map(JString(_)).getOrElse(JNothing)) ::
+        // don't show creationTime for now
+        JField("creationTime",
+          JString(DataUtils.dateTimeToString(d.creationTime))) ::
+        Nil)
+    }
+  }
+
+  /** :: DeveloperApi ::
+    * Convert JSON4S JValue to [[Event]]
+    *
+    * @return deserialization routine used by [[DBSerializer]]
+    */
+  @DeveloperApi
+  def deserializeFromJValue: PartialFunction[JValue, Event] = {
+    case jv: JValue => {
+      val event = (jv \ "event").extract[String]
+      val entityType = (jv \ "entityType").extract[String]
+      val entityId = (jv \ "entityId").extract[String]
+      val targetEntityType = (jv \ "targetEntityType").extract[Option[String]]
+      val targetEntityId = (jv \ "targetEntityId").extract[Option[String]]
+      val properties = (jv \ "properties").extract[JObject]
+      val eventTime = DataUtils.stringToDateTime(
+        (jv \ "eventTime").extract[String])
+      val tags = (jv \ "tags").extract[Seq[String]]
+      val prId = (jv \ "prId").extract[Option[String]]
+      val creationTime = DataUtils.stringToDateTime(
+        (jv \ "creationTime").extract[String])
+      Event(
+        event = event,
+        entityType = entityType,
+        entityId = entityId,
+        targetEntityType = targetEntityType,
+        targetEntityId = targetEntityId,
+        properties = DataMap(properties),
+        eventTime = eventTime,
+        tags = tags,
+        prId = prId,
+        creationTime = creationTime)
+    }
+  }
+
+  /** :: DeveloperApi ::
+    * Convert [[Event]] to JSON4S JValue
+    *
+    * @return serialization routine used by [[DBSerializer]]
+    */
+  @DeveloperApi
+  def serializeToJValue: PartialFunction[Any, JValue] = {
+    case d: Event => {
+      JObject(
+        JField("event", JString(d.event)) ::
+        JField("entityType", JString(d.entityType)) ::
+        JField("entityId", JString(d.entityId)) ::
+        JField("targetEntityType",
+          d.targetEntityType.map(JString(_)).getOrElse(JNothing)) ::
+        JField("targetEntityId",
+          d.targetEntityId.map(JString(_)).getOrElse(JNothing)) ::
+        JField("properties", d.properties.toJObject) ::
+        JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) ::
+        JField("tags", JArray(d.tags.toList.map(JString(_)))) ::
+        JField("prId",
+          d.prId.map(JString(_)).getOrElse(JNothing)) ::
+        JField("creationTime",
+          JString(DataUtils.dateTimeToString(d.creationTime))) ::
+        Nil)
+    }
+  }
+
+  /** :: DeveloperApi ::
+    * Custom JSON4S serializer for [[Event]] intended to be used by database
+    * access, or anywhere that demands serdes of [[Event]] to/from JSON4S JValue
+    */
+  @DeveloperApi
+  class DBSerializer extends CustomSerializer[Event](format => (
+    deserializeFromJValue, serializeToJValue))
+
+  /** :: DeveloperApi ::
+    * Custom JSON4S serializer for [[Event]] intended to be used by the Event
+    * Server, or anywhere that demands serdes of [[Event]] to/from JSON
+    */
+  @DeveloperApi
+  class APISerializer extends CustomSerializer[Event](format => (
+    readJson, writeJson))
+}
+
+
+@DeveloperApi
+object BatchEventsJson4sSupport {
+  implicit val formats = DefaultFormats
+
+  @DeveloperApi
+  def readJson: PartialFunction[JValue, Seq[Try[Event]]] = {
+    case JArray(events) => {
+      events.map { event =>
+        try {
+          Success(EventJson4sSupport.readJson(event))
+        } catch {
+          case e: Exception => Failure(e)
+        }
+      }
+    }
+  }
+
+  @DeveloperApi
+  class APISerializer extends CustomSerializer[Seq[Try[Event]]](format => (readJson, Map.empty))
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala
new file mode 100644
index 0000000..6836c6d
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala
@@ -0,0 +1,145 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.joda.time.DateTime
+
+/** :: DeveloperApi ::
+  * Provides aggregation support of [[Event]]s to [[LEvents]]. Engine developers
+  * should use [[org.apache.predictionio.data.store.LEventStore]] instead of using this
+  * directly.
+  *
+  * @group Event Data
+  */
+@DeveloperApi
+object LEventAggregator {
+  /** :: DeveloperApi ::
+    * Aggregate all properties grouped by entity type given an iterator of
+    * [[Event]]s with the latest property values from all [[Event]]s, and their
+    * first and last updated time
+    *
+    * @param events An iterator of [[Event]]s whose properties will be aggregated
+    * @return A map of entity type to [[PropertyMap]]
+    */
+  @DeveloperApi
+  def aggregateProperties(events: Iterator[Event]): Map[String, PropertyMap] = {
+    events.toList
+      .groupBy(_.entityId)
+      .mapValues(_.sortBy(_.eventTime.getMillis)
+        .foldLeft[Prop](Prop())(propAggregator))
+      .filter{ case (k, v) => v.dm.isDefined }
+      .mapValues{ v =>
+        require(v.firstUpdated.isDefined,
+          "Unexpected Error: firstUpdated cannot be None.")
+        require(v.lastUpdated.isDefined,
+          "Unexpected Error: lastUpdated cannot be None.")
+
+        PropertyMap(
+          fields = v.dm.get.fields,
+          firstUpdated = v.firstUpdated.get,
+          lastUpdated = v.lastUpdated.get
+        )
+      }
+  }
+
+  /** :: DeveloperApi ::
+    * Aggregate all properties given an iterator of [[Event]]s with the latest
+    * property values from all [[Event]]s, and their first and last updated time
+    *
+    * @param events An iterator of [[Event]]s whose properties will be aggregated
+    * @return An optional [[PropertyMap]]
+    */
+  @DeveloperApi
+  def aggregatePropertiesSingle(events: Iterator[Event])
+  : Option[PropertyMap] = {
+    val prop = events.toList
+      .sortBy(_.eventTime.getMillis)
+      .foldLeft[Prop](Prop())(propAggregator)
+
+    prop.dm.map{ d =>
+      require(prop.firstUpdated.isDefined,
+        "Unexpected Error: firstUpdated cannot be None.")
+      require(prop.lastUpdated.isDefined,
+        "Unexpected Error: lastUpdated cannot be None.")
+
+      PropertyMap(
+        fields = d.fields,
+        firstUpdated = prop.firstUpdated.get,
+        lastUpdated = prop.lastUpdated.get
+      )
+    }
+  }
+
+  /** Event names that control aggregation: \$set, \$unset, and \$delete */
+  val eventNames = List("$set", "$unset", "$delete")
+
+  private
+  def dataMapAggregator: ((Option[DataMap], Event) => Option[DataMap]) = {
+    (p, e) => {
+      e.event match {
+        case "$set" => {
+          if (p == None) {
+            Some(e.properties)
+          } else {
+            p.map(_ ++ e.properties)
+          }
+        }
+        case "$unset" => {
+          if (p == None) {
+            None
+          } else {
+            p.map(_ -- e.properties.keySet)
+          }
+        }
+        case "$delete" => None
+        case _ => p // do nothing for others
+      }
+    }
+  }
+
+  private
+  def propAggregator: ((Prop, Event) => Prop) = {
+    (p, e) => {
+      e.event match {
+        case "$set" | "$unset" | "$delete" => {
+          Prop(
+            dm = dataMapAggregator(p.dm, e),
+            firstUpdated = p.firstUpdated.map { t =>
+              first(t, e.eventTime)
+            }.orElse(Some(e.eventTime)),
+            lastUpdated = p.lastUpdated.map { t =>
+              last(t, e.eventTime)
+            }.orElse(Some(e.eventTime))
+          )
+        }
+        case _ => p // do nothing for others
+      }
+    }
+  }
+
+  private
+  def first(a: DateTime, b: DateTime): DateTime = if (b.isBefore(a)) b else a
+
+  private
+  def last(a: DateTime, b: DateTime): DateTime = if (b.isAfter(a)) b else a
+
+  private case class Prop(
+    dm: Option[DataMap] = None,
+    firstUpdated: Option[DateTime] = None,
+    lastUpdated: Option[DateTime] = None
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
new file mode 100644
index 0000000..d6e753c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
@@ -0,0 +1,489 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.annotation.Experimental
+
+import scala.concurrent.Future
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext
+import scala.concurrent.TimeoutException
+
+import org.joda.time.DateTime
+
+/** :: DeveloperApi ::
+  * Base trait of a data access object that directly returns [[Event]] without
+  * going through Spark's parallelization. Engine developers should use
+  * [[org.apache.predictionio.data.store.LEventStore]] instead of using this directly.
+  *
+  * @group Event Data
+  */
+@DeveloperApi
+trait LEvents {
+  /** Default timeout for asynchronous operations that is set to 1 minute */
+  val defaultTimeout = Duration(60, "seconds")
+
+  /** :: DeveloperApi ::
+    * Initialize Event Store for an app ID and optionally a channel ID.
+    * This routine is to be called when an app is first created.
+    *
+    * @param appId App ID
+    * @param channelId Optional channel ID
+    * @return true if initialization was successful; false otherwise.
+    */
+  @DeveloperApi
+  def init(appId: Int, channelId: Option[Int] = None): Boolean
+
+  /** :: DeveloperApi ::
+    * Remove Event Store for an app ID and optional channel ID.
+    *
+    * @param appId App ID
+    * @param channelId Optional channel ID
+    * @return true if removal was successful; false otherwise.
+    */
+  @DeveloperApi
+  def remove(appId: Int, channelId: Option[Int] = None): Boolean
+
+  /** :: DeveloperApi ::
+    * Close this Event Store interface object, e.g. close connection, release
+    * resources, etc.
+    */
+  @DeveloperApi
+  def close(): Unit
+
+  /** :: DeveloperApi ::
+    * Insert an [[Event]] in a non-blocking fashion.
+    *
+    * @param event An [[Event]] to be inserted
+    * @param appId App ID for the [[Event]] to be inserted to
+    */
+  @DeveloperApi
+  def futureInsert(event: Event, appId: Int)(implicit ec: ExecutionContext):
+    Future[String] = futureInsert(event, appId, None)
+
+  /** :: DeveloperApi ::
+    * Insert an [[Event]] in a non-blocking fashion.
+    *
+    * @param event An [[Event]] to be inserted
+    * @param appId App ID for the [[Event]] to be inserted to
+    * @param channelId Optional channel ID for the [[Event]] to be inserted to
+    */
+  @DeveloperApi
+  def futureInsert(
+    event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String]
+
+  /** :: DeveloperApi ::
+    * Get an [[Event]] in a non-blocking fashion.
+    *
+    * @param eventId ID of the [[Event]]
+    * @param appId ID of the app that contains the [[Event]]
+    */
+  @DeveloperApi
+  def futureGet(eventId: String, appId: Int)(implicit ec: ExecutionContext):
+    Future[Option[Event]] = futureGet(eventId, appId, None)
+
+  /** :: DeveloperApi ::
+    * Get an [[Event]] in a non-blocking fashion.
+    *
+    * @param eventId ID of the [[Event]]
+    * @param appId ID of the app that contains the [[Event]]
+    * @param channelId Optional channel ID that contains the [[Event]]
+    */
+  @DeveloperApi
+  def futureGet(
+      eventId: String,
+      appId: Int,
+      channelId: Option[Int]
+    )(implicit ec: ExecutionContext): Future[Option[Event]]
+
+  /** :: DeveloperApi ::
+    * Delete an [[Event]] in a non-blocking fashion.
+    *
+    * @param eventId ID of the [[Event]]
+    * @param appId ID of the app that contains the [[Event]]
+    */
+  @DeveloperApi
+  def futureDelete(eventId: String, appId: Int)(implicit ec: ExecutionContext):
+    Future[Boolean] = futureDelete(eventId, appId, None)
+
+  /** :: DeveloperApi ::
+    * Delete an [[Event]] in a non-blocking fashion.
+    *
+    * @param eventId ID of the [[Event]]
+    * @param appId ID of the app that contains the [[Event]]
+    * @param channelId Optional channel ID that contains the [[Event]]
+    */
+  @DeveloperApi
+  def futureDelete(
+      eventId: String,
+      appId: Int,
+      channelId: Option[Int]
+    )(implicit ec: ExecutionContext): Future[Boolean]
+
+  /** :: DeveloperApi ::
+    * Reads from database and returns a Future of Iterator of [[Event]]s.
+    *
+    * @param appId return events of this app ID
+    * @param channelId return events of this channel ID (default channel if it's None)
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param reversed Reverse the order.
+    *   - return oldest events first if None or Some(false) (default)
+    *   - return latest events first if Some(true)
+    * @param ec ExecutionContext
+    * @return Future[Iterator[Event]]
+    */
+  @DeveloperApi
+  def futureFind(
+      appId: Int,
+      channelId: Option[Int] = None,
+      startTime: Option[DateTime] = None,
+      untilTime: Option[DateTime] = None,
+      entityType: Option[String] = None,
+      entityId: Option[String] = None,
+      eventNames: Option[Seq[String]] = None,
+      targetEntityType: Option[Option[String]] = None,
+      targetEntityId: Option[Option[String]] = None,
+      limit: Option[Int] = None,
+      reversed: Option[Boolean] = None
+    )(implicit ec: ExecutionContext): Future[Iterator[Event]]
+
+  /** Aggregate properties of entities based on these special events:
+    * \$set, \$unset, \$delete events.
+    * and returns a Future of Map of entityId to properties.
+    *
+    * @param appId use events of this app ID
+    * @param channelId use events of this channel ID (default channel if it's None)
+    * @param entityType aggregate properties of the entities of this entityType
+    * @param startTime use events with eventTime >= startTime
+    * @param untilTime use events with eventTime < untilTime
+    * @param required only keep entities with these required properties defined
+    * @param ec ExecutionContext
+    * @return Future[Map[String, PropertyMap]]
+    */
+  private[prediction] def futureAggregateProperties(
+    appId: Int,
+    channelId: Option[Int] = None,
+    entityType: String,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    required: Option[Seq[String]] = None)(implicit ec: ExecutionContext):
+    Future[Map[String, PropertyMap]] = {
+      futureFind(
+        appId = appId,
+        channelId = channelId,
+        startTime = startTime,
+        untilTime = untilTime,
+        entityType = Some(entityType),
+        eventNames = Some(LEventAggregator.eventNames)
+      ).map{ eventIt =>
+        val dm = LEventAggregator.aggregateProperties(eventIt)
+        if (required.isDefined) {
+          dm.filter { case (k, v) =>
+            required.get.map(v.contains(_)).reduce(_ && _)
+          }
+        } else dm
+      }
+    }
+
+  /**
+    * :: Experimental ::
+    *
+    * Aggregate properties of the specified entity (entityType + entityId)
+    * based on these special events:
+    * \$set, \$unset, \$delete events.
+    * and returns a Future of Option[PropertyMap]
+    *
+    * @param appId use events of this app ID
+    * @param channelId use events of this channel ID (default channel if it's None)
+    * @param entityType the entityType
+    * @param entityId the entityId
+    * @param startTime use events with eventTime >= startTime
+    * @param untilTime use events with eventTime < untilTime
+    * @param ec ExecutionContext
+    * @return Future[Option[PropertyMap]]
+    */
+  @Experimental
+  private[prediction] def futureAggregatePropertiesOfEntity(
+    appId: Int,
+    channelId: Option[Int] = None,
+    entityType: String,
+    entityId: String,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None)(implicit ec: ExecutionContext):
+    Future[Option[PropertyMap]] = {
+      futureFind(
+        appId = appId,
+        channelId = channelId,
+        startTime = startTime,
+        untilTime = untilTime,
+        entityType = Some(entityType),
+        entityId = Some(entityId),
+        eventNames = Some(LEventAggregator.eventNames)
+      ).map{ eventIt =>
+        LEventAggregator.aggregatePropertiesSingle(eventIt)
+      }
+    }
+
+  // following is blocking
+  private[prediction] def insert(event: Event, appId: Int,
+    channelId: Option[Int] = None,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    String = {
+    Await.result(futureInsert(event, appId, channelId), timeout)
+  }
+
+  private[prediction] def get(eventId: String, appId: Int,
+    channelId: Option[Int] = None,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    Option[Event] = {
+    Await.result(futureGet(eventId, appId, channelId), timeout)
+  }
+
+  private[prediction] def delete(eventId: String, appId: Int,
+    channelId: Option[Int] = None,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    Boolean = {
+    Await.result(futureDelete(eventId, appId, channelId), timeout)
+  }
+
+  /** reads from database and returns events iterator.
+    *
+    * @param appId return events of this app ID
+    * @param channelId return events of this channel ID (default channel if it's None)
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param reversed Reverse the order (should be used with both
+    *   targetEntityType and targetEntityId specified)
+    *   - return oldest events first if None or Some(false) (default)
+    *   - return latest events first if Some(true)
+    * @param ec ExecutionContext
+    * @return Iterator[Event]
+    */
+  private[prediction] def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    limit: Option[Int] = None,
+    reversed: Option[Boolean] = None,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    Iterator[Event] = {
+      Await.result(futureFind(
+        appId = appId,
+        channelId = channelId,
+        startTime = startTime,
+        untilTime = untilTime,
+        entityType = entityType,
+        entityId = entityId,
+        eventNames = eventNames,
+        targetEntityType = targetEntityType,
+        targetEntityId = targetEntityId,
+        limit = limit,
+        reversed = reversed), timeout)
+  }
+
+  // NOTE: remove in next release
+  @deprecated("Use find() instead.", "0.9.2")
+  private[prediction] def findLegacy(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    limit: Option[Int] = None,
+    reversed: Option[Boolean] = None,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    Either[StorageError, Iterator[Event]] = {
+      try {
+        // return Either for legacy usage
+        Right(Await.result(futureFind(
+          appId = appId,
+          channelId = channelId,
+          startTime = startTime,
+          untilTime = untilTime,
+          entityType = entityType,
+          entityId = entityId,
+          eventNames = eventNames,
+          targetEntityType = targetEntityType,
+          targetEntityId = targetEntityId,
+          limit = limit,
+          reversed = reversed), timeout))
+      } catch {
+        case e: TimeoutException => Left(StorageError(s"${e}"))
+        case e: Exception => Left(StorageError(s"${e}"))
+      }
+  }
+
+  /** reads events of the specified entity.
+    *
+    * @param appId return events of this app ID
+    * @param channelId return events of this channel ID (default channel if it's None)
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first (default true)
+    * @param ec ExecutionContext
+    * @return Either[StorageError, Iterator[Event]]
+    */
+  // NOTE: remove this function in next release
+  @deprecated("Use LEventStore.findByEntity() instead.", "0.9.2")
+  def findSingleEntity(
+    appId: Int,
+    channelId: Option[Int] = None,
+    entityType: String,
+    entityId: String,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None,
+    latest: Boolean = true,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    Either[StorageError, Iterator[Event]] = {
+
+    findLegacy(
+      appId = appId,
+      channelId = channelId,
+      startTime = startTime,
+      untilTime = untilTime,
+      entityType = Some(entityType),
+      entityId = Some(entityId),
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      limit = limit,
+      reversed = Some(latest),
+      timeout = timeout)
+
+  }
+
+  /** Aggregate properties of entities based on these special events:
+    * \$set, \$unset, \$delete events.
+    * and returns a Map of entityId to properties.
+    *
+    * @param appId use events of this app ID
+    * @param channelId use events of this channel ID (default channel if it's None)
+    * @param entityType aggregate properties of the entities of this entityType
+    * @param startTime use events with eventTime >= startTime
+    * @param untilTime use events with eventTime < untilTime
+    * @param required only keep entities with these required properties defined
+    * @param ec ExecutionContext
+    * @return Map[String, PropertyMap]
+    */
+  private[prediction] def aggregateProperties(
+    appId: Int,
+    channelId: Option[Int] = None,
+    entityType: String,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    required: Option[Seq[String]] = None,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    Map[String, PropertyMap] = {
+    Await.result(futureAggregateProperties(
+      appId = appId,
+      channelId = channelId,
+      entityType = entityType,
+      startTime = startTime,
+      untilTime = untilTime,
+      required = required), timeout)
+  }
+
+  /**
+    * :: Experimental ::
+    *
+    * Aggregate properties of the specified entity (entityType + entityId)
+    * based on these special events:
+    * \$set, \$unset, \$delete events.
+    * and returns Option[PropertyMap]
+    *
+    * @param appId use events of this app ID
+    * @param channelId use events of this channel ID
+    * @param entityType the entityType
+    * @param entityId the entityId
+    * @param startTime use events with eventTime >= startTime
+    * @param untilTime use events with eventTime < untilTime
+    * @param ec ExecutionContext
+    * @return Future[Option[PropertyMap]]
+    */
+  @Experimental
+  private[prediction] def aggregatePropertiesOfEntity(
+    appId: Int,
+    channelId: Option[Int] = None,
+    entityType: String,
+    entityId: String,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+    Option[PropertyMap] = {
+
+    Await.result(futureAggregatePropertiesOfEntity(
+      appId = appId,
+      channelId = channelId,
+      entityType = entityType,
+      entityId = entityId,
+      startTime = startTime,
+      untilTime = untilTime), timeout)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala
new file mode 100644
index 0000000..15d7444
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala
@@ -0,0 +1,80 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import com.google.common.io.BaseEncoding
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+  * Stores model for each engine instance
+  *
+  * @param id ID of the model, which should be the same as engine instance ID
+  * @param models Trained models of all algorithms
+  * @group Model Data
+  */
+@DeveloperApi
+case class Model(
+  id: String,
+  models: Array[Byte])
+
+/** :: DeveloperApi ::
+  * Base trait for of the [[Model]] data access object
+  *
+  * @group Model Data
+  */
+@DeveloperApi
+trait Models {
+  /** Insert a new [[Model]] */
+  def insert(i: Model): Unit
+
+  /** Get a [[Model]] by ID */
+  def get(id: String): Option[Model]
+
+  /** Delete a [[Model]] */
+  def delete(id: String): Unit
+}
+
+/** :: DeveloperApi ::
+  * JSON4S serializer for [[Model]]
+  *
+  * @group Model Data
+  */
+@DeveloperApi
+class ModelSerializer extends CustomSerializer[Model](
+  format => ({
+    case JObject(fields) =>
+      implicit val formats = DefaultFormats
+      val seed = Model(
+          id = "",
+          models = Array[Byte]())
+      fields.foldLeft(seed) { case (i, field) =>
+        field match {
+          case JField("id", JString(id)) => i.copy(id = id)
+          case JField("models", JString(models)) =>
+            i.copy(models = BaseEncoding.base64.decode(models))
+          case _ => i
+        }
+      }
+  },
+  {
+    case i: Model =>
+      JObject(
+        JField("id", JString(i.id)) ::
+        JField("models", JString(BaseEncoding.base64.encode(i.models))) ::
+        Nil)
+  }
+))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
new file mode 100644
index 0000000..72287dd
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
@@ -0,0 +1,209 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+// each JValue data associated with the time it is set
+private[prediction] case class PropTime(val d: JValue, val t: Long)
+    extends Serializable
+
+private[prediction] case class SetProp (
+  val fields: Map[String, PropTime],
+  // last set time. Note: fields could be empty with valid set time
+  val t: Long) extends Serializable {
+
+  def ++ (that: SetProp): SetProp = {
+    val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+    val common: Map[String, PropTime] = commonKeys.map { k =>
+      val thisData = this.fields(k)
+      val thatData = that.fields(k)
+      // only keep the value with latest time
+      val v = if (thisData.t > thatData.t) thisData else thatData
+      (k, v)
+    }.toMap
+
+    val combinedFields = common ++
+      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+    // keep the latest set time
+    val combinedT = if (this.t > that.t) this.t else that.t
+
+    SetProp(
+      fields = combinedFields,
+      t = combinedT
+    )
+  }
+}
+
+private[prediction] case class UnsetProp (fields: Map[String, Long])
+    extends Serializable {
+  def ++ (that: UnsetProp): UnsetProp = {
+    val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+    val common: Map[String, Long] = commonKeys.map { k =>
+      val thisData = this.fields(k)
+      val thatData = that.fields(k)
+      // only keep the value with latest time
+      val v = if (thisData > thatData) thisData else thatData
+      (k, v)
+    }.toMap
+
+    val combinedFields = common ++
+      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+    UnsetProp(
+      fields = combinedFields
+    )
+  }
+}
+
+private[prediction] case class DeleteEntity (t: Long) extends Serializable {
+  def ++ (that: DeleteEntity): DeleteEntity = {
+    if (this.t > that.t) this else that
+  }
+}
+
+private[prediction] case class EventOp (
+  val setProp: Option[SetProp] = None,
+  val unsetProp: Option[UnsetProp] = None,
+  val deleteEntity: Option[DeleteEntity] = None,
+  val firstUpdated: Option[DateTime] = None,
+  val lastUpdated: Option[DateTime] = None
+) extends Serializable {
+
+  def ++ (that: EventOp): EventOp = {
+    val firstUp = (this.firstUpdated ++ that.firstUpdated).reduceOption{
+      (a, b) => if (b.getMillis < a.getMillis) b else a
+    }
+    val lastUp = (this.lastUpdated ++ that.lastUpdated).reduceOption {
+      (a, b) => if (b.getMillis > a.getMillis) b else a
+    }
+
+    EventOp(
+      setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
+      unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
+      deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _),
+      firstUpdated = firstUp,
+      lastUpdated = lastUp
+    )
+  }
+
+  def toPropertyMap(): Option[PropertyMap] = {
+    setProp.flatMap { set =>
+
+      val unsetKeys: Set[String] = unsetProp.map( unset =>
+        unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
+      ).getOrElse(Set())
+
+      val combinedFields = deleteEntity.map { delete =>
+        if (delete.t >= set.t) {
+          None
+        } else {
+          val deleteKeys: Set[String] = set.fields
+            .filter { case (k, PropTime(kv, t)) =>
+              (delete.t >= t)
+            }.keySet
+          Some(set.fields -- unsetKeys -- deleteKeys)
+        }
+      }.getOrElse{
+        Some(set.fields -- unsetKeys)
+      }
+
+      // Note: mapValues() doesn't return concrete Map and causes
+      // NotSerializableException issue. Use map(identity) to work around this.
+      // see https://issues.scala-lang.org/browse/SI-7005
+      combinedFields.map{ f =>
+        require(firstUpdated.isDefined,
+          "Unexpected Error: firstUpdated cannot be None.")
+        require(lastUpdated.isDefined,
+          "Unexpected Error: lastUpdated cannot be None.")
+        PropertyMap(
+          fields = f.mapValues(_.d).map(identity),
+          firstUpdated = firstUpdated.get,
+          lastUpdated = lastUpdated.get
+        )
+      }
+    }
+  }
+
+}
+
+private[prediction] object EventOp {
+  // create EventOp from Event object
+  def apply(e: Event): EventOp = {
+    val t = e.eventTime.getMillis
+    e.event match {
+      case "$set" => {
+        val fields = e.properties.fields.mapValues(jv =>
+          PropTime(jv, t)
+        ).map(identity)
+
+        EventOp(
+          setProp = Some(SetProp(fields = fields, t = t)),
+          firstUpdated = Some(e.eventTime),
+          lastUpdated = Some(e.eventTime)
+        )
+      }
+      case "$unset" => {
+        val fields = e.properties.fields.mapValues(jv => t).map(identity)
+        EventOp(
+          unsetProp = Some(UnsetProp(fields = fields)),
+          firstUpdated = Some(e.eventTime),
+          lastUpdated = Some(e.eventTime)
+        )
+      }
+      case "$delete" => {
+        EventOp(
+          deleteEntity = Some(DeleteEntity(t)),
+          firstUpdated = Some(e.eventTime),
+          lastUpdated = Some(e.eventTime)
+        )
+      }
+      case _ => {
+        EventOp()
+      }
+    }
+  }
+}
+
+
+private[prediction] object PEventAggregator {
+
+  val eventNames = List("$set", "$unset", "$delete")
+
+  def aggregateProperties(eventsRDD: RDD[Event]): RDD[(String, PropertyMap)] = {
+    eventsRDD
+      .map( e => (e.entityId, EventOp(e) ))
+      .aggregateByKey[EventOp](EventOp())(
+        // within same partition
+        seqOp = { case (u, v) => u ++ v },
+        // across partition
+        combOp = { case (accu, u) => accu ++ u }
+      )
+      .mapValues(_.toPropertyMap)
+      .filter{ case (k, v) => v.isDefined }
+      .map{ case (k, v) => (k, v.get) }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala
new file mode 100644
index 0000000..49e5a5e
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala
@@ -0,0 +1,182 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.annotation.Experimental
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.joda.time.DateTime
+
+import scala.reflect.ClassTag
+
+/** :: DeveloperApi ::
+  * Base trait of a data access object that returns [[Event]] related RDD data
+  * structure. Engine developers should use
+  * [[org.apache.predictionio.data.store.PEventStore]] instead of using this directly.
+  *
+  * @group Event Data
+  */
+@DeveloperApi
+trait PEvents extends Serializable {
+  @transient protected lazy val logger = Logger[this.type]
+  @deprecated("Use PEventStore.find() instead.", "0.9.2")
+  def getByAppIdAndTimeAndEntity(appId: Int,
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    entityType: Option[String],
+    entityId: Option[String])(sc: SparkContext): RDD[Event] = {
+      find(
+        appId = appId,
+        startTime = startTime,
+        untilTime = untilTime,
+        entityType = entityType,
+        entityId = entityId,
+        eventNames = None
+      )(sc)
+    }
+
+  /** :: DeveloperApi ::
+    * Read from database and return the events. The deprecation here is intended
+    * to engine developers only.
+    *
+    * @param appId return events of this app ID
+    * @param channelId return events of this channel ID (default channel if it's None)
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param sc Spark context
+    * @return RDD[Event]
+    */
+  @deprecated("Use PEventStore.find() instead.", "0.9.2")
+  @DeveloperApi
+  def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event]
+
+  /** Aggregate properties of entities based on these special events:
+    * \$set, \$unset, \$delete events. The deprecation here is intended to
+    * engine developers only.
+    *
+    * @param appId use events of this app ID
+    * @param channelId use events of this channel ID (default channel if it's None)
+    * @param entityType aggregate properties of the entities of this entityType
+    * @param startTime use events with eventTime >= startTime
+    * @param untilTime use events with eventTime < untilTime
+    * @param required only keep entities with these required properties defined
+    * @param sc Spark context
+    * @return RDD[(String, PropertyMap)] RDD of entityId and PropertyMap pair
+    */
+  @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2")
+  def aggregateProperties(
+    appId: Int,
+    channelId: Option[Int] = None,
+    entityType: String,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    required: Option[Seq[String]] = None)
+    (sc: SparkContext): RDD[(String, PropertyMap)] = {
+    val eventRDD = find(
+      appId = appId,
+      channelId = channelId,
+      startTime = startTime,
+      untilTime = untilTime,
+      entityType = Some(entityType),
+      eventNames = Some(PEventAggregator.eventNames))(sc)
+
+    val dmRDD = PEventAggregator.aggregateProperties(eventRDD)
+
+    required map { r =>
+      dmRDD.filter { case (k, v) =>
+        r.map(v.contains(_)).reduce(_ && _)
+      }
+    } getOrElse dmRDD
+  }
+
+  /** :: Experimental ::
+    * Extract EntityMap[A] from events for the entityType
+    * NOTE: it is local EntityMap[A]
+    */
+  @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2")
+  @Experimental
+  def extractEntityMap[A: ClassTag](
+    appId: Int,
+    entityType: String,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    required: Option[Seq[String]] = None)
+    (sc: SparkContext)(extract: DataMap => A): EntityMap[A] = {
+    val idToData: Map[String, A] = aggregateProperties(
+      appId = appId,
+      entityType = entityType,
+      startTime = startTime,
+      untilTime = untilTime,
+      required = required
+    )(sc).map{ case (id, dm) =>
+      try {
+        (id, extract(dm))
+      } catch {
+        case e: Exception => {
+          logger.error(s"Failed to get extract entity from DataMap $dm of " +
+            s"entityId $id.", e)
+          throw e
+        }
+      }
+    }.collectAsMap.toMap
+
+    new EntityMap(idToData)
+  }
+
+  /** :: DeveloperApi ::
+    * Write events to database
+    *
+    * @param events RDD of Event
+    * @param appId the app ID
+    * @param sc Spark Context
+    */
+  @DeveloperApi
+  def write(events: RDD[Event], appId: Int)(sc: SparkContext): Unit =
+    write(events, appId, None)(sc)
+
+  /** :: DeveloperApi ::
+    * Write events to database
+    *
+    * @param events RDD of Event
+    * @param appId the app ID
+    * @param channelId  channel ID (default channel if it's None)
+    * @param sc Spark Context
+    */
+  @DeveloperApi
+  def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala
new file mode 100644
index 0000000..9935558
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala
@@ -0,0 +1,96 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.predictionio.data.storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+
+/** A PropertyMap stores aggregated properties of the entity.
+  * Internally it is a Map
+  * whose keys are property names and values are corresponding JSON values
+  * respectively. Use the get() method to retrieve the value of mandatory
+  * property or use getOpt() to retrieve the value of the optional property.
+  *
+  * @param fields Map of property name to JValue
+  * @param firstUpdated first updated time of this PropertyMap
+  * @param lastUpdated last updated time of this PropertyMap
+  */
+class PropertyMap(
+  fields: Map[String, JValue],
+  val firstUpdated: DateTime,
+  val lastUpdated: DateTime
+) extends DataMap(fields) {
+
+  override
+  def toString: String = s"PropertyMap(${fields}, ${firstUpdated}, ${lastUpdated})"
+
+  override
+  def hashCode: Int =
+    41 * (
+      41 * (
+        41 + fields.hashCode
+      ) + firstUpdated.hashCode
+    ) + lastUpdated.hashCode
+
+  override
+  def equals(other: Any): Boolean = other match {
+    case that: PropertyMap => {
+      (that.canEqual(this)) &&
+      (super.equals(that)) &&
+      (this.firstUpdated.equals(that.firstUpdated)) &&
+      (this.lastUpdated.equals(that.lastUpdated))
+    }
+    case that: DataMap => { // for testing purpose
+      super.equals(that)
+    }
+    case _ => false
+  }
+
+  override
+  def canEqual(other: Any): Boolean = other.isInstanceOf[PropertyMap]
+}
+
+/** Companion object of the [[PropertyMap]] class. */
+object PropertyMap {
+
+  /** Create an PropertyMap from a Map of String to JValue,
+    * firstUpdated and lastUpdated time.
+    *
+    * @param fields a Map of String to JValue
+    * @param firstUpdated First updated time
+    * @param lastUpdated Last updated time
+    * @return a new PropertyMap
+    */
+  def apply(fields: Map[String, JValue],
+    firstUpdated: DateTime, lastUpdated: DateTime): PropertyMap =
+    new PropertyMap(fields, firstUpdated, lastUpdated)
+
+  /** Create an PropertyMap from a JSON String and firstUpdated and lastUpdated
+    * time.
+    * @param js JSON String. eg """{ "a": 1, "b": "foo" }"""
+    * @param firstUpdated First updated time
+    * @param lastUpdated Last updated time
+    * @return a new PropertyMap
+    */
+  def apply(js: String, firstUpdated: DateTime, lastUpdated: DateTime)
+  : PropertyMap = apply(
+      fields = parse(js).asInstanceOf[JObject].obj.toMap,
+      firstUpdated = firstUpdated,
+      lastUpdated = lastUpdated
+    )
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
new file mode 100644
index 0000000..1f170be
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
@@ -0,0 +1,403 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import java.lang.reflect.InvocationTargetException
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.annotation.DeveloperApi
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.language.existentials
+import scala.reflect.runtime.universe._
+
+/** :: DeveloperApi ::
+  * Any storage backend drivers will need to implement this trait with exactly
+  * '''StorageClient''' as the class name. PredictionIO storage layer will look
+  * for this class when it instantiates the actual backend for use by higher
+  * level storage access APIs.
+  *
+  * @group Storage System
+  */
+@DeveloperApi
+trait BaseStorageClient {
+  /** Configuration of the '''StorageClient''' */
+  val config: StorageClientConfig
+
+  /** The actual client object. This could be a database connection or any kind
+    * of database access object.
+    */
+  val client: AnyRef
+
+  /** Set a prefix for storage class discovery. As an example, if this prefix
+    * is set as ''JDBC'', when the storage layer instantiates an implementation
+    * of [[Apps]], it will try to look for a class named ''JDBCApps''.
+    */
+  val prefix: String = ""
+}
+
+/** :: DeveloperApi ::
+  * A wrapper of storage client configuration that will be populated by
+  * PredictionIO automatically, and passed to the StorageClient during
+  * instantiation.
+  *
+  * @param parallel This is set to true by PredictionIO when the storage client
+  *                 is instantiated in a parallel data source.
+  * @param test This is set to true by PredictionIO when tests are being run.
+  * @param properties This is populated by PredictionIO automatically from
+  *                   environmental configuration variables. If you have these
+  *                   variables,
+  *                   - PIO_STORAGE_SOURCES_PGSQL_TYPE=jdbc
+  *                   - PIO_STORAGE_SOURCES_PGSQL_USERNAME=abc
+  *                   - PIO_STOARGE_SOURCES_PGSQL_PASSWORD=xyz
+  *
+  *                   this field will be filled as a map of string to string:
+  *                   - TYPE -> jdbc
+  *                   - USERNAME -> abc
+  *                   - PASSWORD -> xyz
+  *
+  * @group Storage System
+  */
+@DeveloperApi
+case class StorageClientConfig(
+  parallel: Boolean = false, // parallelized access (RDD)?
+  test: Boolean = false, // test mode config
+  properties: Map[String, String] = Map())
+
+/** :: DeveloperApi ::
+  * Thrown when a StorageClient runs into an exceptional condition
+  *
+  * @param message Exception error message
+  * @param cause The underlying exception that caused the exception
+  * @group Storage System
+  */
+@DeveloperApi
+class StorageClientException(message: String, cause: Throwable)
+  extends RuntimeException(message, cause)
+
+@deprecated("Use StorageException", "0.9.2")
+private[prediction] case class StorageError(message: String)
+
+/** :: DeveloperApi ::
+  * Thrown by data access objects when they run into exceptional conditions
+  *
+  * @param message Exception error message
+  * @param cause The underlying exception that caused the exception
+  *
+  * @group Storage System
+  */
+@DeveloperApi
+class StorageException(message: String, cause: Throwable)
+  extends Exception(message, cause) {
+
+  def this(message: String) = this(message, null)
+}
+
+/** Backend-agnostic data storage layer with lazy initialization. Use this
+  * object when you need to interface with Event Store in your engine.
+  *
+  * @group Storage System
+  */
+object Storage extends Logging {
+  private case class ClientMeta(
+    sourceType: String,
+    client: BaseStorageClient,
+    config: StorageClientConfig)
+
+  private case class DataObjectMeta(sourceName: String, namespace: String)
+
+  private var errors = 0
+
+  private val sourcesPrefix = "PIO_STORAGE_SOURCES"
+
+  private val sourceTypesRegex = """PIO_STORAGE_SOURCES_([^_]+)_TYPE""".r
+
+  private val sourceKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
+    sourceTypesRegex findFirstIn k match {
+      case Some(sourceTypesRegex(sourceType)) => Seq(sourceType)
+      case None => Nil
+    }
+  }
+
+  if (sourceKeys.size == 0) warn("There is no properly configured data source.")
+
+  private val s2cm = scala.collection.mutable.Map[String, Option[ClientMeta]]()
+
+  /** Reference to the app data repository. */
+  private val EventDataRepository = "EVENTDATA"
+  private val ModelDataRepository = "MODELDATA"
+  private val MetaDataRepository = "METADATA"
+
+  private val repositoriesPrefix = "PIO_STORAGE_REPOSITORIES"
+
+  private val repositoryNamesRegex =
+    """PIO_STORAGE_REPOSITORIES_([^_]+)_NAME""".r
+
+  private val repositoryKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
+    repositoryNamesRegex findFirstIn k match {
+      case Some(repositoryNamesRegex(repositoryName)) => Seq(repositoryName)
+      case None => Nil
+    }
+  }
+
+  if (repositoryKeys.size == 0) {
+    warn("There is no properly configured repository.")
+  }
+
+  private val requiredRepositories = Seq(MetaDataRepository)
+
+  requiredRepositories foreach { r =>
+    if (!repositoryKeys.contains(r)) {
+      error(s"Required repository (${r}) configuration is missing.")
+      errors += 1
+    }
+  }
+  private val repositoriesToDataObjectMeta: Map[String, DataObjectMeta] =
+    repositoryKeys.map(r =>
+      try {
+        val keyedPath = repositoriesPrefixPath(r)
+        val name = sys.env(prefixPath(keyedPath, "NAME"))
+        val sourceName = sys.env(prefixPath(keyedPath, "SOURCE"))
+        if (sourceKeys.contains(sourceName)) {
+          r -> DataObjectMeta(
+            sourceName = sourceName,
+            namespace = name)
+        } else {
+          error(s"$sourceName is not a configured storage source.")
+          r -> DataObjectMeta("", "")
+        }
+      } catch {
+        case e: Throwable =>
+          error(e.getMessage)
+          errors += 1
+          r -> DataObjectMeta("", "")
+      }
+    ).toMap
+
+  if (errors > 0) {
+    error(s"There were $errors configuration errors. Exiting.")
+    sys.exit(errors)
+  }
+
+  // End of constructor and field definitions and begin method definitions
+
+  private def prefixPath(prefix: String, body: String) = s"${prefix}_$body"
+
+  private def sourcesPrefixPath(body: String) = prefixPath(sourcesPrefix, body)
+
+  private def repositoriesPrefixPath(body: String) =
+    prefixPath(repositoriesPrefix, body)
+
+  private def sourcesToClientMeta(
+      source: String,
+      parallel: Boolean,
+      test: Boolean): Option[ClientMeta] = {
+    val sourceName = if (parallel) s"parallel-$source" else source
+    s2cm.getOrElseUpdate(sourceName, updateS2CM(source, parallel, test))
+  }
+
+  private def getClient(
+    clientConfig: StorageClientConfig,
+    pkg: String): BaseStorageClient = {
+    val className = "org.apache.predictionio.data.storage." + pkg + ".StorageClient"
+    try {
+      Class.forName(className).getConstructors()(0).newInstance(clientConfig).
+        asInstanceOf[BaseStorageClient]
+    } catch {
+      case e: ClassNotFoundException =>
+        val originalClassName = pkg + ".StorageClient"
+        Class.forName(originalClassName).getConstructors()(0).
+          newInstance(clientConfig).asInstanceOf[BaseStorageClient]
+      case e: java.lang.reflect.InvocationTargetException =>
+        throw e.getCause
+    }
+  }
+
+  /** Get the StorageClient config data from PIO Framework's environment variables */
+  def getConfig(sourceName: String): Option[StorageClientConfig] = {
+    if (s2cm.contains(sourceName) && s2cm.get(sourceName).nonEmpty
+      && s2cm.get(sourceName).get.nonEmpty) {
+      Some(s2cm.get(sourceName).get.get.config)
+    } else None
+  }
+
+  private def updateS2CM(k: String, parallel: Boolean, test: Boolean):
+  Option[ClientMeta] = {
+    try {
+      val keyedPath = sourcesPrefixPath(k)
+      val sourceType = sys.env(prefixPath(keyedPath, "TYPE"))
+      val props = sys.env.filter(t => t._1.startsWith(keyedPath)).map(
+        t => t._1.replace(s"${keyedPath}_", "") -> t._2)
+      val clientConfig = StorageClientConfig(
+        properties = props,
+        parallel = parallel,
+        test = test)
+      val client = getClient(clientConfig, sourceType)
+      Some(ClientMeta(sourceType, client, clientConfig))
+    } catch {
+      case e: Throwable =>
+        error(s"Error initializing storage client for source ${k}", e)
+        errors += 1
+        None
+    }
+  }
+
+  private[prediction]
+  def getDataObjectFromRepo[T](repo: String, test: Boolean = false)
+    (implicit tag: TypeTag[T]): T = {
+    val repoDOMeta = repositoriesToDataObjectMeta(repo)
+    val repoDOSourceName = repoDOMeta.sourceName
+    getDataObject[T](repoDOSourceName, repoDOMeta.namespace, test = test)
+  }
+
+  private[prediction]
+  def getPDataObject[T](repo: String)(implicit tag: TypeTag[T]): T = {
+    val repoDOMeta = repositoriesToDataObjectMeta(repo)
+    val repoDOSourceName = repoDOMeta.sourceName
+    getPDataObject[T](repoDOSourceName, repoDOMeta.namespace)
+  }
+
+  private[prediction] def getDataObject[T](
+      sourceName: String,
+      namespace: String,
+      parallel: Boolean = false,
+      test: Boolean = false)(implicit tag: TypeTag[T]): T = {
+    val clientMeta = sourcesToClientMeta(sourceName, parallel, test) getOrElse {
+      throw new StorageClientException(
+        s"Data source $sourceName was not properly initialized.", null)
+    }
+    val sourceType = clientMeta.sourceType
+    val ctorArgs = dataObjectCtorArgs(clientMeta.client, namespace)
+    val classPrefix = clientMeta.client.prefix
+    val originalClassName = tag.tpe.toString.split('.')
+    val rawClassName = sourceType + "." + classPrefix + originalClassName.last
+    val className = "org.apache.predictionio.data.storage." + rawClassName
+    val clazz = try {
+      Class.forName(className)
+    } catch {
+      case e: ClassNotFoundException =>
+        try {
+          Class.forName(rawClassName)
+        } catch {
+          case e: ClassNotFoundException =>
+            throw new StorageClientException("No storage backend " +
+              "implementation can be found (tried both " +
+              s"$className and $rawClassName)", e)
+        }
+    }
+    val constructor = clazz.getConstructors()(0)
+    try {
+      constructor.newInstance(ctorArgs: _*).
+        asInstanceOf[T]
+    } catch {
+      case e: IllegalArgumentException =>
+        error(
+          "Unable to instantiate data object with class '" +
+          constructor.getDeclaringClass.getName + " because its constructor" +
+          " does not have the right number of arguments." +
+          " Number of required constructor arguments: " +
+          ctorArgs.size + "." +
+          " Number of existing constructor arguments: " +
+          constructor.getParameterTypes.size + "." +
+          s" Storage source name: ${sourceName}." +
+          s" Exception message: ${e.getMessage}).", e)
+        errors += 1
+        throw e
+      case e: java.lang.reflect.InvocationTargetException =>
+        throw e.getCause
+    }
+  }
+
+  private def getPDataObject[T](
+      sourceName: String,
+      databaseName: String)(implicit tag: TypeTag[T]): T =
+    getDataObject[T](sourceName, databaseName, true)
+
+  private def dataObjectCtorArgs(
+      client: BaseStorageClient,
+      namespace: String): Seq[AnyRef] = {
+    Seq(client.client, client.config, namespace)
+  }
+
+  private[prediction] def verifyAllDataObjects(): Unit = {
+    info("Verifying Meta Data Backend (Source: " +
+      s"${repositoriesToDataObjectMeta(MetaDataRepository).sourceName})...")
+    getMetaDataEngineManifests()
+    getMetaDataEngineInstances()
+    getMetaDataEvaluationInstances()
+    getMetaDataApps()
+    getMetaDataAccessKeys()
+    info("Verifying Model Data Backend (Source: " +
+      s"${repositoriesToDataObjectMeta(ModelDataRepository).sourceName})...")
+    getModelDataModels()
+    info("Verifying Event Data Backend (Source: " +
+      s"${repositoriesToDataObjectMeta(EventDataRepository).sourceName})...")
+    val eventsDb = getLEvents(test = true)
+    info("Test writing to Event Store (App Id 0)...")
+    // use appId=0 for testing purpose
+    eventsDb.init(0)
+    eventsDb.insert(Event(
+      event = "test",
+      entityType = "test",
+      entityId = "test"), 0)
+    eventsDb.remove(0)
+    eventsDb.close()
+  }
+
+  private[prediction] def getMetaDataEngineManifests(): EngineManifests =
+    getDataObjectFromRepo[EngineManifests](MetaDataRepository)
+
+  private[prediction] def getMetaDataEngineInstances(): EngineInstances =
+    getDataObjectFromRepo[EngineInstances](MetaDataRepository)
+
+  private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances =
+    getDataObjectFromRepo[EvaluationInstances](MetaDataRepository)
+
+  private[prediction] def getMetaDataApps(): Apps =
+    getDataObjectFromRepo[Apps](MetaDataRepository)
+
+  private[prediction] def getMetaDataAccessKeys(): AccessKeys =
+    getDataObjectFromRepo[AccessKeys](MetaDataRepository)
+
+  private[prediction] def getMetaDataChannels(): Channels =
+    getDataObjectFromRepo[Channels](MetaDataRepository)
+
+  private[prediction] def getModelDataModels(): Models =
+    getDataObjectFromRepo[Models](ModelDataRepository)
+
+  /** Obtains a data access object that returns [[Event]] related local data
+    * structure.
+    */
+  def getLEvents(test: Boolean = false): LEvents =
+    getDataObjectFromRepo[LEvents](EventDataRepository, test = test)
+
+  /** Obtains a data access object that returns [[Event]] related RDD data
+    * structure.
+    */
+  def getPEvents(): PEvents =
+    getPDataObject[PEvents](EventDataRepository)
+
+  def config: Map[String, Map[String, Map[String, String]]] = Map(
+    "sources" -> s2cm.toMap.map { case (source, clientMeta) =>
+      source -> clientMeta.map { cm =>
+        Map(
+          "type" -> cm.sourceType,
+          "config" -> cm.config.properties.map(t => s"${t._1} -> ${t._2}").mkString(", ")
+        )
+      }.getOrElse(Map.empty)
+    }
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala
new file mode 100644
index 0000000..321b245
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala
@@ -0,0 +1,47 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.joda.time.DateTime
+import org.joda.time.format.ISODateTimeFormat
+
+/** Backend-agnostic storage utilities. */
+private[prediction] object Utils {
+  /**
+   * Add prefix to custom attribute keys.
+   */
+  def addPrefixToAttributeKeys[T](
+      attributes: Map[String, T],
+      prefix: String = "ca_"): Map[String, T] = {
+    attributes map { case (k, v) => (prefix + k, v) }
+  }
+
+  /** Remove prefix from custom attribute keys. */
+  def removePrefixFromAttributeKeys[T](
+      attributes: Map[String, T],
+      prefix: String = "ca_"): Map[String, T] = {
+    attributes map { case (k, v) => (k.stripPrefix(prefix), v) }
+  }
+
+  /**
+   * Appends App ID to any ID.
+   * Used for distinguishing different app's data within a single collection.
+   */
+  def idWithAppid(appid: Int, id: String): String = appid + "_" + id
+
+  def stringToDateTime(dt: String): DateTime =
+    ISODateTimeFormat.dateTimeParser.parseDateTime(dt)
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
new file mode 100644
index 0000000..7853d97
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -0,0 +1,116 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+import scala.util.Random
+
+/** Elasticsearch implementation of AccessKeys. */
+class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
+    extends AccessKeys with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "accesskeys"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(accessKey: AccessKey): Option[String] = {
+    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+    update(accessKey.copy(key = key))
+    Some(key)
+  }
+
+  def get(key: String): Option[AccessKey] = {
+    try {
+      val response = client.prepareGet(
+        index,
+        estype,
+        key).get()
+      Some(read[AccessKey](response.getSourceAsString))
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+      case e: NullPointerException => None
+    }
+  }
+
+  def getAll(): Seq[AccessKey] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[AccessKey](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[AccessKey]()
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[AccessKey] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("appid", appid))
+      ESUtils.getAll[AccessKey](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[AccessKey]()
+    }
+  }
+
+  def update(accessKey: AccessKey): Unit = {
+    try {
+      client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+
+  def delete(key: String): Unit = {
+    try {
+      client.prepareDelete(index, estype, key).get
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
new file mode 100644
index 0000000..6790b52
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -0,0 +1,127 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+/** Elasticsearch implementation of Items. */
+class ESApps(client: Client, config: StorageClientConfig, index: String)
+  extends Apps with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "apps"
+  private val seq = new ESSequences(client, config, index)
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(app: App): Option[Int] = {
+    val id =
+      if (app.id == 0) {
+        var roll = seq.genNext("apps")
+        while (!get(roll).isEmpty) roll = seq.genNext("apps")
+        roll
+      }
+      else app.id
+    val realapp = app.copy(id = id)
+    update(realapp)
+    Some(id)
+  }
+
+  def get(id: Int): Option[App] = {
+    try {
+      val response = client.prepareGet(
+        index,
+        estype,
+        id.toString).get()
+      Some(read[App](response.getSourceAsString))
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+      case e: NullPointerException => None
+    }
+  }
+
+  def getByName(name: String): Option[App] = {
+    try {
+      val response = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("name", name)).get
+      val hits = response.getHits().hits()
+      if (hits.size > 0) {
+        Some(read[App](hits.head.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[App] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[App](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[App]()
+    }
+  }
+
+  def update(app: App): Unit = {
+    try {
+      val response = client.prepareIndex(index, estype, app.id.toString).
+        setSource(write(app)).get()
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    try {
+      client.prepareDelete(index, estype, id.toString).get
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+}