You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:42 UTC
[11/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala
new file mode 100644
index 0000000..6169a02
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala
@@ -0,0 +1,164 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+
+/** Each event in the Event Store can be represented by fields in this case
+ * class.
+ *
+ * @param eventId Unique ID of this event.
+ * @param event Name of this event.
+ * @param entityType Type of the entity associated with this event.
+ * @param entityId ID of the entity associated with this event.
+ * @param targetEntityType Type of the target entity associated with this
+ * event.
+ * @param targetEntityId ID of the target entity associated with this event.
+ * @param properties Properties associated with this event.
+ * @param eventTime Time of the happening of this event.
+ * @param tags Tags of this event.
+ * @param prId PredictedResultId of this event.
+ * @param creationTime Time of creation in the system of this event.
+ * @group Event Data
+ */
+case class Event(
+ val eventId: Option[String] = None,
+ val event: String,
+ val entityType: String,
+ val entityId: String,
+ val targetEntityType: Option[String] = None,
+ val targetEntityId: Option[String] = None,
+ val properties: DataMap = DataMap(), // default empty
+ val eventTime: DateTime = DateTime.now,
+ val tags: Seq[String] = Nil,
+ val prId: Option[String] = None,
+ val creationTime: DateTime = DateTime.now
+) {
+ override def toString(): String = {
+ s"Event(id=$eventId,event=$event,eType=$entityType,eId=$entityId," +
+ s"tType=$targetEntityType,tId=$targetEntityId,p=$properties,t=$eventTime," +
+ s"tags=$tags,pKey=$prId,ct=$creationTime)"
+ }
+}
+
+/** :: DeveloperApi ::
+ * Utilities for validating [[Event]]s
+ *
+ * @group Event Data
+ */
+@DeveloperApi
+object EventValidation {
+ /** Default time zone is set to UTC */
+ val defaultTimeZone = DateTimeZone.UTC
+
+ /** Checks whether an event name contains a reserved prefix
+ *
+ * @param name Event name
+ * @return true if event name starts with \$ or pio_, false otherwise
+ */
+ def isReservedPrefix(name: String): Boolean = name.startsWith("$") ||
+ name.startsWith("pio_")
+
+ /** PredictionIO reserves some single entity event names. They are currently
+ * \$set, \$unset, and \$delete.
+ */
+ val specialEvents = Set("$set", "$unset", "$delete")
+
+ /** Checks whether an event name is a special PredictionIO event name
+ *
+ * @param name Event name
+ * @return true if the name is a special event, false otherwise
+ */
+ def isSpecialEvents(name: String): Boolean = specialEvents.contains(name)
+
+ /** Validate an [[Event]], throwing exceptions when the candidate violates any
+ * of the following:
+ *
+ * - event name must not be empty
+ * - entityType must not be empty
+ * - entityId must not be empty
+ * - targetEntityType must not be Some of empty
+ * - targetEntityId must not be Some of empty
+ * - targetEntityType and targetEntityId must be both Some or None
+ * - properties must not be empty when event is \$unset
+ * - event name must be a special event if it has a reserved prefix
+ * - targetEntityType and targetEntityId must be None if the event name has
+ * a reserved prefix
+ * - entityType must be a built-in entity type if entityType has a
+ * reserved prefix
+ * - targetEntityType must be a built-in entity type if targetEntityType is
+ * Some and has a reserved prefix
+ *
+ * @param e Event to be validated
+ */
+ def validate(e: Event): Unit = {
+
+ require(!e.event.isEmpty, "event must not be empty.")
+ require(!e.entityType.isEmpty, "entityType must not be empty string.")
+ require(!e.entityId.isEmpty, "entityId must not be empty string.")
+ require(e.targetEntityType.map(!_.isEmpty).getOrElse(true),
+ "targetEntityType must not be empty string")
+ require(e.targetEntityId.map(!_.isEmpty).getOrElse(true),
+ "targetEntityId must not be empty string.")
+ require(!((e.targetEntityType != None) && (e.targetEntityId == None)),
+ "targetEntityType and targetEntityId must be specified together.")
+ require(!((e.targetEntityType == None) && (e.targetEntityId != None)),
+ "targetEntityType and targetEntityId must be specified together.")
+ require(!((e.event == "$unset") && e.properties.isEmpty),
+ "properties cannot be empty for $unset event")
+ require(!isReservedPrefix(e.event) || isSpecialEvents(e.event),
+ s"${e.event} is not a supported reserved event name.")
+ require(!isSpecialEvents(e.event) ||
+ ((e.targetEntityType == None) && (e.targetEntityId == None)),
+ s"Reserved event ${e.event} cannot have targetEntity")
+ require(!isReservedPrefix(e.entityType) ||
+ isBuiltinEntityTypes(e.entityType),
+ s"The entityType ${e.entityType} is not allowed. " +
+ s"'pio_' is a reserved name prefix.")
+ require(e.targetEntityType.map{ t =>
+ (!isReservedPrefix(t) || isBuiltinEntityTypes(t))}.getOrElse(true),
+ s"The targetEntityType ${e.targetEntityType.get} is not allowed. " +
+ s"'pio_' is a reserved name prefix.")
+ validateProperties(e)
+ }
+
+ /** Defines built-in entity types. The current built-in type is pio_pr. */
+ val builtinEntityTypes: Set[String] = Set("pio_pr")
+
+ /** Defines built-in properties. This is currently empty. */
+ val builtinProperties: Set[String] = Set()
+
+ /** Checks whether an entity type is a built-in entity type */
+ def isBuiltinEntityTypes(name: String): Boolean = builtinEntityTypes.contains(name)
+
+ /** Validate event properties, throwing exceptions when the candidate violates
+ * any of the following:
+ *
+ * - property name must not contain a reserved prefix
+ *
+ * @param e Event to be validated
+ */
+ def validateProperties(e: Event): Unit = {
+ e.properties.keySet.foreach { k =>
+ require(!isReservedPrefix(k) || builtinProperties.contains(k),
+ s"The property ${k} is not allowed. " +
+ s"'pio_' is a reserved name prefix.")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
new file mode 100644
index 0000000..7d4fce3
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
@@ -0,0 +1,236 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.data.{Utils => DataUtils}
+import org.joda.time.DateTime
+import org.json4s._
+import scala.util.{Try, Success, Failure}
+
+/** :: DeveloperApi ::
+ * Support library for dealing with [[Event]] and JSON4S
+ *
+ * @group Event Data
+ */
+@DeveloperApi
+object EventJson4sSupport {
+ /** This is set to org.json4s.DefaultFormats. Do not use JSON4S to serialize
+ * or deserialize Joda-Time DateTime because it has some issues with timezone
+ * (as of version 3.2.10)
+ */
+ implicit val formats = DefaultFormats
+
+ /** :: DeveloperApi ::
+ * Convert JSON from Event Server to [[Event]]
+ *
+ * @return deserialization routine used by [[APISerializer]]
+ */
+ @DeveloperApi
+ def readJson: PartialFunction[JValue, Event] = {
+ case JObject(x) => {
+ val fields = new DataMap(x.toMap)
+ // use get() if required in json
+ // use getOpt() if not required in json
+ try {
+ val event = fields.get[String]("event")
+ val entityType = fields.get[String]("entityType")
+ val entityId = fields.get[String]("entityId")
+ val targetEntityType = fields.getOpt[String]("targetEntityType")
+ val targetEntityId = fields.getOpt[String]("targetEntityId")
+ val properties = fields.getOrElse[Map[String, JValue]](
+ "properties", Map())
+ // default currentTime expressed as UTC timezone
+ lazy val currentTime = DateTime.now(EventValidation.defaultTimeZone)
+ val eventTime = fields.getOpt[String]("eventTime")
+ .map{ s =>
+ try {
+ DataUtils.stringToDateTime(s)
+ } catch {
+ case _: Exception =>
+ throw new MappingException(s"Fail to extract eventTime ${s}")
+ }
+ }.getOrElse(currentTime)
+
+ // disable tags from API for now.
+ val tags = List()
+ // val tags = fields.getOpt[Seq[String]]("tags").getOrElse(List())
+
+ val prId = fields.getOpt[String]("prId")
+
+ // don't allow user set creationTime from API for now.
+ val creationTime = currentTime
+ // val creationTime = fields.getOpt[String]("creationTime")
+ // .map{ s =>
+ // try {
+ // DataUtils.stringToDateTime(s)
+ // } catch {
+ // case _: Exception =>
+ // throw new MappingException(s"Fail to extract creationTime ${s}")
+ // }
+ // }.getOrElse(currentTime)
+
+
+ val newEvent = Event(
+ event = event,
+ entityType = entityType,
+ entityId = entityId,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ properties = DataMap(properties),
+ eventTime = eventTime,
+ prId = prId,
+ creationTime = creationTime
+ )
+ EventValidation.validate(newEvent)
+ newEvent
+ } catch {
+ case e: Exception => throw new MappingException(e.toString, e)
+ }
+ }
+ }
+
+ /** :: DeveloperApi ::
+ * Convert [[Event]] to JSON for use by the Event Server
+ *
+ * @return serialization routine used by [[APISerializer]]
+ */
+ @DeveloperApi
+ def writeJson: PartialFunction[Any, JValue] = {
+ case d: Event => {
+ JObject(
+ JField("eventId",
+ d.eventId.map( eid => JString(eid)).getOrElse(JNothing)) ::
+ JField("event", JString(d.event)) ::
+ JField("entityType", JString(d.entityType)) ::
+ JField("entityId", JString(d.entityId)) ::
+ JField("targetEntityType",
+ d.targetEntityType.map(JString(_)).getOrElse(JNothing)) ::
+ JField("targetEntityId",
+ d.targetEntityId.map(JString(_)).getOrElse(JNothing)) ::
+ JField("properties", d.properties.toJObject) ::
+ JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) ::
+ // disable tags from API for now
+ // JField("tags", JArray(d.tags.toList.map(JString(_)))) ::
+ // disable tags from API for now
+ JField("prId",
+ d.prId.map(JString(_)).getOrElse(JNothing)) ::
+ // don't show creationTime for now
+ JField("creationTime",
+ JString(DataUtils.dateTimeToString(d.creationTime))) ::
+ Nil)
+ }
+ }
+
+ /** :: DeveloperApi ::
+ * Convert JSON4S JValue to [[Event]]
+ *
+ * @return deserialization routine used by [[DBSerializer]]
+ */
+ @DeveloperApi
+ def deserializeFromJValue: PartialFunction[JValue, Event] = {
+ case jv: JValue => {
+ val event = (jv \ "event").extract[String]
+ val entityType = (jv \ "entityType").extract[String]
+ val entityId = (jv \ "entityId").extract[String]
+ val targetEntityType = (jv \ "targetEntityType").extract[Option[String]]
+ val targetEntityId = (jv \ "targetEntityId").extract[Option[String]]
+ val properties = (jv \ "properties").extract[JObject]
+ val eventTime = DataUtils.stringToDateTime(
+ (jv \ "eventTime").extract[String])
+ val tags = (jv \ "tags").extract[Seq[String]]
+ val prId = (jv \ "prId").extract[Option[String]]
+ val creationTime = DataUtils.stringToDateTime(
+ (jv \ "creationTime").extract[String])
+ Event(
+ event = event,
+ entityType = entityType,
+ entityId = entityId,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ properties = DataMap(properties),
+ eventTime = eventTime,
+ tags = tags,
+ prId = prId,
+ creationTime = creationTime)
+ }
+ }
+
+ /** :: DeveloperApi ::
+ * Convert [[Event]] to JSON4S JValue
+ *
+ * @return serialization routine used by [[DBSerializer]]
+ */
+ @DeveloperApi
+ def serializeToJValue: PartialFunction[Any, JValue] = {
+ case d: Event => {
+ JObject(
+ JField("event", JString(d.event)) ::
+ JField("entityType", JString(d.entityType)) ::
+ JField("entityId", JString(d.entityId)) ::
+ JField("targetEntityType",
+ d.targetEntityType.map(JString(_)).getOrElse(JNothing)) ::
+ JField("targetEntityId",
+ d.targetEntityId.map(JString(_)).getOrElse(JNothing)) ::
+ JField("properties", d.properties.toJObject) ::
+ JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) ::
+ JField("tags", JArray(d.tags.toList.map(JString(_)))) ::
+ JField("prId",
+ d.prId.map(JString(_)).getOrElse(JNothing)) ::
+ JField("creationTime",
+ JString(DataUtils.dateTimeToString(d.creationTime))) ::
+ Nil)
+ }
+ }
+
+ /** :: DeveloperApi ::
+ * Custom JSON4S serializer for [[Event]] intended to be used by database
+ * access, or anywhere that demands serdes of [[Event]] to/from JSON4S JValue
+ */
+ @DeveloperApi
+ class DBSerializer extends CustomSerializer[Event](format => (
+ deserializeFromJValue, serializeToJValue))
+
+ /** :: DeveloperApi ::
+ * Custom JSON4S serializer for [[Event]] intended to be used by the Event
+ * Server, or anywhere that demands serdes of [[Event]] to/from JSON
+ */
+ @DeveloperApi
+ class APISerializer extends CustomSerializer[Event](format => (
+ readJson, writeJson))
+}
+
+
+@DeveloperApi
+object BatchEventsJson4sSupport {
+ implicit val formats = DefaultFormats
+
+ @DeveloperApi
+ def readJson: PartialFunction[JValue, Seq[Try[Event]]] = {
+ case JArray(events) => {
+ events.map { event =>
+ try {
+ Success(EventJson4sSupport.readJson(event))
+ } catch {
+ case e: Exception => Failure(e)
+ }
+ }
+ }
+ }
+
+ @DeveloperApi
+ class APISerializer extends CustomSerializer[Seq[Try[Event]]](format => (readJson, Map.empty))
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala
new file mode 100644
index 0000000..6836c6d
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala
@@ -0,0 +1,145 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.joda.time.DateTime
+
+/** :: DeveloperApi ::
+ * Provides aggregation support of [[Event]]s to [[LEvents]]. Engine developers
+ * should use [[org.apache.predictionio.data.store.LEventStore]] instead of using this
+ * directly.
+ *
+ * @group Event Data
+ */
+@DeveloperApi
+object LEventAggregator {
+ /** :: DeveloperApi ::
+ * Aggregate all properties grouped by entity type given an iterator of
+ * [[Event]]s with the latest property values from all [[Event]]s, and their
+ * first and last updated time
+ *
+ * @param events An iterator of [[Event]]s whose properties will be aggregated
+ * @return A map of entity type to [[PropertyMap]]
+ */
+ @DeveloperApi
+ def aggregateProperties(events: Iterator[Event]): Map[String, PropertyMap] = {
+ events.toList
+ .groupBy(_.entityId)
+ .mapValues(_.sortBy(_.eventTime.getMillis)
+ .foldLeft[Prop](Prop())(propAggregator))
+ .filter{ case (k, v) => v.dm.isDefined }
+ .mapValues{ v =>
+ require(v.firstUpdated.isDefined,
+ "Unexpected Error: firstUpdated cannot be None.")
+ require(v.lastUpdated.isDefined,
+ "Unexpected Error: lastUpdated cannot be None.")
+
+ PropertyMap(
+ fields = v.dm.get.fields,
+ firstUpdated = v.firstUpdated.get,
+ lastUpdated = v.lastUpdated.get
+ )
+ }
+ }
+
+ /** :: DeveloperApi ::
+ * Aggregate all properties given an iterator of [[Event]]s with the latest
+ * property values from all [[Event]]s, and their first and last updated time
+ *
+ * @param events An iterator of [[Event]]s whose properties will be aggregated
+ * @return An optional [[PropertyMap]]
+ */
+ @DeveloperApi
+ def aggregatePropertiesSingle(events: Iterator[Event])
+ : Option[PropertyMap] = {
+ val prop = events.toList
+ .sortBy(_.eventTime.getMillis)
+ .foldLeft[Prop](Prop())(propAggregator)
+
+ prop.dm.map{ d =>
+ require(prop.firstUpdated.isDefined,
+ "Unexpected Error: firstUpdated cannot be None.")
+ require(prop.lastUpdated.isDefined,
+ "Unexpected Error: lastUpdated cannot be None.")
+
+ PropertyMap(
+ fields = d.fields,
+ firstUpdated = prop.firstUpdated.get,
+ lastUpdated = prop.lastUpdated.get
+ )
+ }
+ }
+
+ /** Event names that control aggregation: \$set, \$unset, and \$delete */
+ val eventNames = List("$set", "$unset", "$delete")
+
+ private
+ def dataMapAggregator: ((Option[DataMap], Event) => Option[DataMap]) = {
+ (p, e) => {
+ e.event match {
+ case "$set" => {
+ if (p == None) {
+ Some(e.properties)
+ } else {
+ p.map(_ ++ e.properties)
+ }
+ }
+ case "$unset" => {
+ if (p == None) {
+ None
+ } else {
+ p.map(_ -- e.properties.keySet)
+ }
+ }
+ case "$delete" => None
+ case _ => p // do nothing for others
+ }
+ }
+ }
+
+ private
+ def propAggregator: ((Prop, Event) => Prop) = {
+ (p, e) => {
+ e.event match {
+ case "$set" | "$unset" | "$delete" => {
+ Prop(
+ dm = dataMapAggregator(p.dm, e),
+ firstUpdated = p.firstUpdated.map { t =>
+ first(t, e.eventTime)
+ }.orElse(Some(e.eventTime)),
+ lastUpdated = p.lastUpdated.map { t =>
+ last(t, e.eventTime)
+ }.orElse(Some(e.eventTime))
+ )
+ }
+ case _ => p // do nothing for others
+ }
+ }
+ }
+
+ private
+ def first(a: DateTime, b: DateTime): DateTime = if (b.isBefore(a)) b else a
+
+ private
+ def last(a: DateTime, b: DateTime): DateTime = if (b.isAfter(a)) b else a
+
+ private case class Prop(
+ dm: Option[DataMap] = None,
+ firstUpdated: Option[DateTime] = None,
+ lastUpdated: Option[DateTime] = None
+ )
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
new file mode 100644
index 0000000..d6e753c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
@@ -0,0 +1,489 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.annotation.Experimental
+
+import scala.concurrent.Future
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext
+import scala.concurrent.TimeoutException
+
+import org.joda.time.DateTime
+
+/** :: DeveloperApi ::
+ * Base trait of a data access object that directly returns [[Event]] without
+ * going through Spark's parallelization. Engine developers should use
+ * [[org.apache.predictionio.data.store.LEventStore]] instead of using this directly.
+ *
+ * @group Event Data
+ */
+@DeveloperApi
+trait LEvents {
+ /** Default timeout for asynchronous operations that is set to 1 minute */
+ val defaultTimeout = Duration(60, "seconds")
+
+ /** :: DeveloperApi ::
+ * Initialize Event Store for an app ID and optionally a channel ID.
+ * This routine is to be called when an app is first created.
+ *
+ * @param appId App ID
+ * @param channelId Optional channel ID
+ * @return true if initialization was successful; false otherwise.
+ */
+ @DeveloperApi
+ def init(appId: Int, channelId: Option[Int] = None): Boolean
+
+ /** :: DeveloperApi ::
+ * Remove Event Store for an app ID and optional channel ID.
+ *
+ * @param appId App ID
+ * @param channelId Optional channel ID
+ * @return true if removal was successful; false otherwise.
+ */
+ @DeveloperApi
+ def remove(appId: Int, channelId: Option[Int] = None): Boolean
+
+ /** :: DeveloperApi ::
+ * Close this Event Store interface object, e.g. close connection, release
+ * resources, etc.
+ */
+ @DeveloperApi
+ def close(): Unit
+
+ /** :: DeveloperApi ::
+ * Insert an [[Event]] in a non-blocking fashion.
+ *
+ * @param event An [[Event]] to be inserted
+ * @param appId App ID for the [[Event]] to be inserted to
+ */
+ @DeveloperApi
+ def futureInsert(event: Event, appId: Int)(implicit ec: ExecutionContext):
+ Future[String] = futureInsert(event, appId, None)
+
+ /** :: DeveloperApi ::
+ * Insert an [[Event]] in a non-blocking fashion.
+ *
+ * @param event An [[Event]] to be inserted
+ * @param appId App ID for the [[Event]] to be inserted to
+ * @param channelId Optional channel ID for the [[Event]] to be inserted to
+ */
+ @DeveloperApi
+ def futureInsert(
+ event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String]
+
+ /** :: DeveloperApi ::
+ * Get an [[Event]] in a non-blocking fashion.
+ *
+ * @param eventId ID of the [[Event]]
+ * @param appId ID of the app that contains the [[Event]]
+ */
+ @DeveloperApi
+ def futureGet(eventId: String, appId: Int)(implicit ec: ExecutionContext):
+ Future[Option[Event]] = futureGet(eventId, appId, None)
+
+ /** :: DeveloperApi ::
+ * Get an [[Event]] in a non-blocking fashion.
+ *
+ * @param eventId ID of the [[Event]]
+ * @param appId ID of the app that contains the [[Event]]
+ * @param channelId Optional channel ID that contains the [[Event]]
+ */
+ @DeveloperApi
+ def futureGet(
+ eventId: String,
+ appId: Int,
+ channelId: Option[Int]
+ )(implicit ec: ExecutionContext): Future[Option[Event]]
+
+ /** :: DeveloperApi ::
+ * Delete an [[Event]] in a non-blocking fashion.
+ *
+ * @param eventId ID of the [[Event]]
+ * @param appId ID of the app that contains the [[Event]]
+ */
+ @DeveloperApi
+ def futureDelete(eventId: String, appId: Int)(implicit ec: ExecutionContext):
+ Future[Boolean] = futureDelete(eventId, appId, None)
+
+ /** :: DeveloperApi ::
+ * Delete an [[Event]] in a non-blocking fashion.
+ *
+ * @param eventId ID of the [[Event]]
+ * @param appId ID of the app that contains the [[Event]]
+ * @param channelId Optional channel ID that contains the [[Event]]
+ */
+ @DeveloperApi
+ def futureDelete(
+ eventId: String,
+ appId: Int,
+ channelId: Option[Int]
+ )(implicit ec: ExecutionContext): Future[Boolean]
+
+ /** :: DeveloperApi ::
+ * Reads from database and returns a Future of Iterator of [[Event]]s.
+ *
+ * @param appId return events of this app ID
+ * @param channelId return events of this channel ID (default channel if it's None)
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @param reversed Reverse the order.
+ * - return oldest events first if None or Some(false) (default)
+ * - return latest events first if Some(true)
+ * @param ec ExecutionContext
+ * @return Future[Iterator[Event]]
+ */
+ @DeveloperApi
+ def futureFind(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ limit: Option[Int] = None,
+ reversed: Option[Boolean] = None
+ )(implicit ec: ExecutionContext): Future[Iterator[Event]]
+
+ /** Aggregate properties of entities based on these special events:
+ * \$set, \$unset, \$delete events.
+ * and returns a Future of Map of entityId to properties.
+ *
+ * @param appId use events of this app ID
+ * @param channelId use events of this channel ID (default channel if it's None)
+ * @param entityType aggregate properties of the entities of this entityType
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param required only keep entities with these required properties defined
+ * @param ec ExecutionContext
+ * @return Future[Map[String, PropertyMap]]
+ */
+ private[prediction] def futureAggregateProperties(
+ appId: Int,
+ channelId: Option[Int] = None,
+ entityType: String,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ required: Option[Seq[String]] = None)(implicit ec: ExecutionContext):
+ Future[Map[String, PropertyMap]] = {
+ futureFind(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = Some(entityType),
+ eventNames = Some(LEventAggregator.eventNames)
+ ).map{ eventIt =>
+ val dm = LEventAggregator.aggregateProperties(eventIt)
+ if (required.isDefined) {
+ dm.filter { case (k, v) =>
+ required.get.map(v.contains(_)).reduce(_ && _)
+ }
+ } else dm
+ }
+ }
+
+ /**
+ * :: Experimental ::
+ *
+ * Aggregate properties of the specified entity (entityType + entityId)
+ * based on these special events:
+ * \$set, \$unset, \$delete events.
+ * and returns a Future of Option[PropertyMap]
+ *
+ * @param appId use events of this app ID
+ * @param channelId use events of this channel ID (default channel if it's None)
+ * @param entityType the entityType
+ * @param entityId the entityId
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param ec ExecutionContext
+ * @return Future[Option[PropertyMap]]
+ */
+ @Experimental
+ private[prediction] def futureAggregatePropertiesOfEntity(
+ appId: Int,
+ channelId: Option[Int] = None,
+ entityType: String,
+ entityId: String,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None)(implicit ec: ExecutionContext):
+ Future[Option[PropertyMap]] = {
+ futureFind(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = Some(entityType),
+ entityId = Some(entityId),
+ eventNames = Some(LEventAggregator.eventNames)
+ ).map{ eventIt =>
+ LEventAggregator.aggregatePropertiesSingle(eventIt)
+ }
+ }
+
+ // following is blocking
+ private[prediction] def insert(event: Event, appId: Int,
+ channelId: Option[Int] = None,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ String = {
+ Await.result(futureInsert(event, appId, channelId), timeout)
+ }
+
+ private[prediction] def get(eventId: String, appId: Int,
+ channelId: Option[Int] = None,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ Option[Event] = {
+ Await.result(futureGet(eventId, appId, channelId), timeout)
+ }
+
+ private[prediction] def delete(eventId: String, appId: Int,
+ channelId: Option[Int] = None,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ Boolean = {
+ Await.result(futureDelete(eventId, appId, channelId), timeout)
+ }
+
+ /** reads from database and returns events iterator.
+ *
+ * @param appId return events of this app ID
+ * @param channelId return events of this channel ID (default channel if it's None)
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @param reversed Reverse the order (should be used with both
+ * targetEntityType and targetEntityId specified)
+ * - return oldest events first if None or Some(false) (default)
+ * - return latest events first if Some(true)
+ * @param ec ExecutionContext
+ * @return Iterator[Event]
+ */
+ private[prediction] def find(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ limit: Option[Int] = None,
+ reversed: Option[Boolean] = None,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ Iterator[Event] = {
+ Await.result(futureFind(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ limit = limit,
+ reversed = reversed), timeout)
+ }
+
+ // NOTE: remove in next release
+ @deprecated("Use find() instead.", "0.9.2")
+ private[prediction] def findLegacy(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ limit: Option[Int] = None,
+ reversed: Option[Boolean] = None,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ Either[StorageError, Iterator[Event]] = {
+ try {
+ // return Either for legacy usage
+ Right(Await.result(futureFind(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ limit = limit,
+ reversed = reversed), timeout))
+ } catch {
+ case e: TimeoutException => Left(StorageError(s"${e}"))
+ case e: Exception => Left(StorageError(s"${e}"))
+ }
+ }
+
+ /** reads events of the specified entity.
+ *
+ * @param appId return events of this app ID
+ * @param channelId return events of this channel ID (default channel if it's None)
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @param latest Return latest event first (default true)
+ * @param ec ExecutionContext
+ * @return Either[StorageError, Iterator[Event]]
+ */
+ // NOTE: remove this function in next release
+ @deprecated("Use LEventStore.findByEntity() instead.", "0.9.2")
+ def findSingleEntity(
+ appId: Int,
+ channelId: Option[Int] = None,
+ entityType: String,
+ entityId: String,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ limit: Option[Int] = None,
+ latest: Boolean = true,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ Either[StorageError, Iterator[Event]] = {
+
+ findLegacy(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = Some(entityType),
+ entityId = Some(entityId),
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ limit = limit,
+ reversed = Some(latest),
+ timeout = timeout)
+
+ }
+
+ /** Aggregate properties of entities based on these special events:
+ * \$set, \$unset, \$delete events.
+ * and returns a Map of entityId to properties.
+ *
+ * @param appId use events of this app ID
+ * @param channelId use events of this channel ID (default channel if it's None)
+ * @param entityType aggregate properties of the entities of this entityType
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param required only keep entities with these required properties defined
+ * @param ec ExecutionContext
+ * @return Map[String, PropertyMap]
+ */
+ private[prediction] def aggregateProperties(
+ appId: Int,
+ channelId: Option[Int] = None,
+ entityType: String,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ required: Option[Seq[String]] = None,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ Map[String, PropertyMap] = {
+ Await.result(futureAggregateProperties(
+ appId = appId,
+ channelId = channelId,
+ entityType = entityType,
+ startTime = startTime,
+ untilTime = untilTime,
+ required = required), timeout)
+ }
+
+ /**
+ * :: Experimental ::
+ *
+ * Aggregate properties of the specified entity (entityType + entityId)
+ * based on these special events:
+ * \$set, \$unset, \$delete events.
+ * and returns Option[PropertyMap]
+ *
+ * @param appId use events of this app ID
+ * @param channelId use events of this channel ID
+ * @param entityType the entityType
+ * @param entityId the entityId
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param ec ExecutionContext
+ * @return Future[Option[PropertyMap]]
+ */
+ @Experimental
+ private[prediction] def aggregatePropertiesOfEntity(
+ appId: Int,
+ channelId: Option[Int] = None,
+ entityType: String,
+ entityId: String,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
+ Option[PropertyMap] = {
+
+ Await.result(futureAggregatePropertiesOfEntity(
+ appId = appId,
+ channelId = channelId,
+ entityType = entityType,
+ entityId = entityId,
+ startTime = startTime,
+ untilTime = untilTime), timeout)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala
new file mode 100644
index 0000000..15d7444
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala
@@ -0,0 +1,80 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import com.google.common.io.BaseEncoding
+import org.apache.predictionio.annotation.DeveloperApi
+import org.json4s._
+
+/** :: DeveloperApi ::
+ * Stores model for each engine instance
+ *
+ * @param id ID of the model, which should be the same as engine instance ID
+ * @param models Trained models of all algorithms
+ * @group Model Data
+ */
+@DeveloperApi
+case class Model(
+ id: String,
+ models: Array[Byte])
+
+/** :: DeveloperApi ::
+ * Base trait for of the [[Model]] data access object
+ *
+ * @group Model Data
+ */
+@DeveloperApi
+trait Models {
+ /** Insert a new [[Model]] */
+ def insert(i: Model): Unit
+
+ /** Get a [[Model]] by ID */
+ def get(id: String): Option[Model]
+
+ /** Delete a [[Model]] */
+ def delete(id: String): Unit
+}
+
+/** :: DeveloperApi ::
+ * JSON4S serializer for [[Model]]
+ *
+ * @group Model Data
+ */
+@DeveloperApi
+class ModelSerializer extends CustomSerializer[Model](
+ format => ({
+ case JObject(fields) =>
+ implicit val formats = DefaultFormats
+ val seed = Model(
+ id = "",
+ models = Array[Byte]())
+ fields.foldLeft(seed) { case (i, field) =>
+ field match {
+ case JField("id", JString(id)) => i.copy(id = id)
+ case JField("models", JString(models)) =>
+ i.copy(models = BaseEncoding.base64.decode(models))
+ case _ => i
+ }
+ }
+ },
+ {
+ case i: Model =>
+ JObject(
+ JField("id", JString(i.id)) ::
+ JField("models", JString(BaseEncoding.base64.encode(i.models))) ::
+ Nil)
+ }
+))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
new file mode 100644
index 0000000..72287dd
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
@@ -0,0 +1,209 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+// each JValue data associated with the time it is set
+private[prediction] case class PropTime(val d: JValue, val t: Long)
+ extends Serializable
+
+private[prediction] case class SetProp (
+ val fields: Map[String, PropTime],
+ // last set time. Note: fields could be empty with valid set time
+ val t: Long) extends Serializable {
+
+ def ++ (that: SetProp): SetProp = {
+ val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+ val common: Map[String, PropTime] = commonKeys.map { k =>
+ val thisData = this.fields(k)
+ val thatData = that.fields(k)
+ // only keep the value with latest time
+ val v = if (thisData.t > thatData.t) thisData else thatData
+ (k, v)
+ }.toMap
+
+ val combinedFields = common ++
+ (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+ // keep the latest set time
+ val combinedT = if (this.t > that.t) this.t else that.t
+
+ SetProp(
+ fields = combinedFields,
+ t = combinedT
+ )
+ }
+}
+
+private[prediction] case class UnsetProp (fields: Map[String, Long])
+ extends Serializable {
+ def ++ (that: UnsetProp): UnsetProp = {
+ val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+ val common: Map[String, Long] = commonKeys.map { k =>
+ val thisData = this.fields(k)
+ val thatData = that.fields(k)
+ // only keep the value with latest time
+ val v = if (thisData > thatData) thisData else thatData
+ (k, v)
+ }.toMap
+
+ val combinedFields = common ++
+ (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+ UnsetProp(
+ fields = combinedFields
+ )
+ }
+}
+
+private[prediction] case class DeleteEntity (t: Long) extends Serializable {
+ def ++ (that: DeleteEntity): DeleteEntity = {
+ if (this.t > that.t) this else that
+ }
+}
+
+private[prediction] case class EventOp (
+ val setProp: Option[SetProp] = None,
+ val unsetProp: Option[UnsetProp] = None,
+ val deleteEntity: Option[DeleteEntity] = None,
+ val firstUpdated: Option[DateTime] = None,
+ val lastUpdated: Option[DateTime] = None
+) extends Serializable {
+
+ def ++ (that: EventOp): EventOp = {
+ val firstUp = (this.firstUpdated ++ that.firstUpdated).reduceOption{
+ (a, b) => if (b.getMillis < a.getMillis) b else a
+ }
+ val lastUp = (this.lastUpdated ++ that.lastUpdated).reduceOption {
+ (a, b) => if (b.getMillis > a.getMillis) b else a
+ }
+
+ EventOp(
+ setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
+ unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
+ deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _),
+ firstUpdated = firstUp,
+ lastUpdated = lastUp
+ )
+ }
+
+ def toPropertyMap(): Option[PropertyMap] = {
+ setProp.flatMap { set =>
+
+ val unsetKeys: Set[String] = unsetProp.map( unset =>
+ unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
+ ).getOrElse(Set())
+
+ val combinedFields = deleteEntity.map { delete =>
+ if (delete.t >= set.t) {
+ None
+ } else {
+ val deleteKeys: Set[String] = set.fields
+ .filter { case (k, PropTime(kv, t)) =>
+ (delete.t >= t)
+ }.keySet
+ Some(set.fields -- unsetKeys -- deleteKeys)
+ }
+ }.getOrElse{
+ Some(set.fields -- unsetKeys)
+ }
+
+ // Note: mapValues() doesn't return concrete Map and causes
+ // NotSerializableException issue. Use map(identity) to work around this.
+ // see https://issues.scala-lang.org/browse/SI-7005
+ combinedFields.map{ f =>
+ require(firstUpdated.isDefined,
+ "Unexpected Error: firstUpdated cannot be None.")
+ require(lastUpdated.isDefined,
+ "Unexpected Error: lastUpdated cannot be None.")
+ PropertyMap(
+ fields = f.mapValues(_.d).map(identity),
+ firstUpdated = firstUpdated.get,
+ lastUpdated = lastUpdated.get
+ )
+ }
+ }
+ }
+
+}
+
+private[prediction] object EventOp {
+ // create EventOp from Event object
+ def apply(e: Event): EventOp = {
+ val t = e.eventTime.getMillis
+ e.event match {
+ case "$set" => {
+ val fields = e.properties.fields.mapValues(jv =>
+ PropTime(jv, t)
+ ).map(identity)
+
+ EventOp(
+ setProp = Some(SetProp(fields = fields, t = t)),
+ firstUpdated = Some(e.eventTime),
+ lastUpdated = Some(e.eventTime)
+ )
+ }
+ case "$unset" => {
+ val fields = e.properties.fields.mapValues(jv => t).map(identity)
+ EventOp(
+ unsetProp = Some(UnsetProp(fields = fields)),
+ firstUpdated = Some(e.eventTime),
+ lastUpdated = Some(e.eventTime)
+ )
+ }
+ case "$delete" => {
+ EventOp(
+ deleteEntity = Some(DeleteEntity(t)),
+ firstUpdated = Some(e.eventTime),
+ lastUpdated = Some(e.eventTime)
+ )
+ }
+ case _ => {
+ EventOp()
+ }
+ }
+ }
+}
+
+
+private[prediction] object PEventAggregator {
+
+ val eventNames = List("$set", "$unset", "$delete")
+
+ def aggregateProperties(eventsRDD: RDD[Event]): RDD[(String, PropertyMap)] = {
+ eventsRDD
+ .map( e => (e.entityId, EventOp(e) ))
+ .aggregateByKey[EventOp](EventOp())(
+ // within same partition
+ seqOp = { case (u, v) => u ++ v },
+ // across partition
+ combOp = { case (accu, u) => accu ++ u }
+ )
+ .mapValues(_.toPropertyMap)
+ .filter{ case (k, v) => v.isDefined }
+ .map{ case (k, v) => (k, v.get) }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala
new file mode 100644
index 0000000..49e5a5e
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala
@@ -0,0 +1,182 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.annotation.Experimental
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.joda.time.DateTime
+
+import scala.reflect.ClassTag
+
+/** :: DeveloperApi ::
+ * Base trait of a data access object that returns [[Event]] related RDD data
+ * structure. Engine developers should use
+ * [[org.apache.predictionio.data.store.PEventStore]] instead of using this directly.
+ *
+ * @group Event Data
+ */
+@DeveloperApi
+trait PEvents extends Serializable {
+ @transient protected lazy val logger = Logger[this.type]
+ @deprecated("Use PEventStore.find() instead.", "0.9.2")
+ def getByAppIdAndTimeAndEntity(appId: Int,
+ startTime: Option[DateTime],
+ untilTime: Option[DateTime],
+ entityType: Option[String],
+ entityId: Option[String])(sc: SparkContext): RDD[Event] = {
+ find(
+ appId = appId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = None
+ )(sc)
+ }
+
+ /** :: DeveloperApi ::
+ * Read from database and return the events. The deprecation here is intended
+ * to engine developers only.
+ *
+ * @param appId return events of this app ID
+ * @param channelId return events of this channel ID (default channel if it's None)
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param sc Spark context
+ * @return RDD[Event]
+ */
+ @deprecated("Use PEventStore.find() instead.", "0.9.2")
+ @DeveloperApi
+ def find(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event]
+
+ /** Aggregate properties of entities based on these special events:
+ * \$set, \$unset, \$delete events. The deprecation here is intended to
+ * engine developers only.
+ *
+ * @param appId use events of this app ID
+ * @param channelId use events of this channel ID (default channel if it's None)
+ * @param entityType aggregate properties of the entities of this entityType
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param required only keep entities with these required properties defined
+ * @param sc Spark context
+ * @return RDD[(String, PropertyMap)] RDD of entityId and PropertyMap pair
+ */
+ @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2")
+ def aggregateProperties(
+ appId: Int,
+ channelId: Option[Int] = None,
+ entityType: String,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ required: Option[Seq[String]] = None)
+ (sc: SparkContext): RDD[(String, PropertyMap)] = {
+ val eventRDD = find(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = Some(entityType),
+ eventNames = Some(PEventAggregator.eventNames))(sc)
+
+ val dmRDD = PEventAggregator.aggregateProperties(eventRDD)
+
+ required map { r =>
+ dmRDD.filter { case (k, v) =>
+ r.map(v.contains(_)).reduce(_ && _)
+ }
+ } getOrElse dmRDD
+ }
+
+ /** :: Experimental ::
+ * Extract EntityMap[A] from events for the entityType
+ * NOTE: it is local EntityMap[A]
+ */
+ @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2")
+ @Experimental
+ def extractEntityMap[A: ClassTag](
+ appId: Int,
+ entityType: String,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ required: Option[Seq[String]] = None)
+ (sc: SparkContext)(extract: DataMap => A): EntityMap[A] = {
+ val idToData: Map[String, A] = aggregateProperties(
+ appId = appId,
+ entityType = entityType,
+ startTime = startTime,
+ untilTime = untilTime,
+ required = required
+ )(sc).map{ case (id, dm) =>
+ try {
+ (id, extract(dm))
+ } catch {
+ case e: Exception => {
+ logger.error(s"Failed to get extract entity from DataMap $dm of " +
+ s"entityId $id.", e)
+ throw e
+ }
+ }
+ }.collectAsMap.toMap
+
+ new EntityMap(idToData)
+ }
+
+ /** :: DeveloperApi ::
+ * Write events to database
+ *
+ * @param events RDD of Event
+ * @param appId the app ID
+ * @param sc Spark Context
+ */
+ @DeveloperApi
+ def write(events: RDD[Event], appId: Int)(sc: SparkContext): Unit =
+ write(events, appId, None)(sc)
+
+ /** :: DeveloperApi ::
+ * Write events to database
+ *
+ * @param events RDD of Event
+ * @param appId the app ID
+ * @param channelId channel ID (default channel if it's None)
+ * @param sc Spark Context
+ */
+ @DeveloperApi
+ def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala
new file mode 100644
index 0000000..9935558
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala
@@ -0,0 +1,96 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.predictionio.data.storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+
+/** A PropertyMap stores aggregated properties of the entity.
+ * Internally it is a Map
+ * whose keys are property names and values are corresponding JSON values
+ * respectively. Use the get() method to retrieve the value of mandatory
+ * property or use getOpt() to retrieve the value of the optional property.
+ *
+ * @param fields Map of property name to JValue
+ * @param firstUpdated first updated time of this PropertyMap
+ * @param lastUpdated last updated time of this PropertyMap
+ */
+class PropertyMap(
+ fields: Map[String, JValue],
+ val firstUpdated: DateTime,
+ val lastUpdated: DateTime
+) extends DataMap(fields) {
+
+ override
+ def toString: String = s"PropertyMap(${fields}, ${firstUpdated}, ${lastUpdated})"
+
+ override
+ def hashCode: Int =
+ 41 * (
+ 41 * (
+ 41 + fields.hashCode
+ ) + firstUpdated.hashCode
+ ) + lastUpdated.hashCode
+
+ override
+ def equals(other: Any): Boolean = other match {
+ case that: PropertyMap => {
+ (that.canEqual(this)) &&
+ (super.equals(that)) &&
+ (this.firstUpdated.equals(that.firstUpdated)) &&
+ (this.lastUpdated.equals(that.lastUpdated))
+ }
+ case that: DataMap => { // for testing purpose
+ super.equals(that)
+ }
+ case _ => false
+ }
+
+ override
+ def canEqual(other: Any): Boolean = other.isInstanceOf[PropertyMap]
+}
+
+/** Companion object of the [[PropertyMap]] class. */
+object PropertyMap {
+
+ /** Create an PropertyMap from a Map of String to JValue,
+ * firstUpdated and lastUpdated time.
+ *
+ * @param fields a Map of String to JValue
+ * @param firstUpdated First updated time
+ * @param lastUpdated Last updated time
+ * @return a new PropertyMap
+ */
+ def apply(fields: Map[String, JValue],
+ firstUpdated: DateTime, lastUpdated: DateTime): PropertyMap =
+ new PropertyMap(fields, firstUpdated, lastUpdated)
+
+ /** Create an PropertyMap from a JSON String and firstUpdated and lastUpdated
+ * time.
+ * @param js JSON String. eg """{ "a": 1, "b": "foo" }"""
+ * @param firstUpdated First updated time
+ * @param lastUpdated Last updated time
+ * @return a new PropertyMap
+ */
+ def apply(js: String, firstUpdated: DateTime, lastUpdated: DateTime)
+ : PropertyMap = apply(
+ fields = parse(js).asInstanceOf[JObject].obj.toMap,
+ firstUpdated = firstUpdated,
+ lastUpdated = lastUpdated
+ )
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
new file mode 100644
index 0000000..1f170be
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
@@ -0,0 +1,403 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import java.lang.reflect.InvocationTargetException
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.annotation.DeveloperApi
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.language.existentials
+import scala.reflect.runtime.universe._
+
+/** :: DeveloperApi ::
+ * Any storage backend drivers will need to implement this trait with exactly
+ * '''StorageClient''' as the class name. PredictionIO storage layer will look
+ * for this class when it instantiates the actual backend for use by higher
+ * level storage access APIs.
+ *
+ * @group Storage System
+ */
+@DeveloperApi
+trait BaseStorageClient {
+ /** Configuration of the '''StorageClient''' */
+ val config: StorageClientConfig
+
+ /** The actual client object. This could be a database connection or any kind
+ * of database access object.
+ */
+ val client: AnyRef
+
+ /** Set a prefix for storage class discovery. As an example, if this prefix
+ * is set as ''JDBC'', when the storage layer instantiates an implementation
+ * of [[Apps]], it will try to look for a class named ''JDBCApps''.
+ */
+ val prefix: String = ""
+}
+
+/** :: DeveloperApi ::
+ * A wrapper of storage client configuration that will be populated by
+ * PredictionIO automatically, and passed to the StorageClient during
+ * instantiation.
+ *
+ * @param parallel This is set to true by PredictionIO when the storage client
+ * is instantiated in a parallel data source.
+ * @param test This is set to true by PredictionIO when tests are being run.
+ * @param properties This is populated by PredictionIO automatically from
+ * environmental configuration variables. If you have these
+ * variables,
+ * - PIO_STORAGE_SOURCES_PGSQL_TYPE=jdbc
+ * - PIO_STORAGE_SOURCES_PGSQL_USERNAME=abc
+ * - PIO_STOARGE_SOURCES_PGSQL_PASSWORD=xyz
+ *
+ * this field will be filled as a map of string to string:
+ * - TYPE -> jdbc
+ * - USERNAME -> abc
+ * - PASSWORD -> xyz
+ *
+ * @group Storage System
+ */
+@DeveloperApi
+case class StorageClientConfig(
+ parallel: Boolean = false, // parallelized access (RDD)?
+ test: Boolean = false, // test mode config
+ properties: Map[String, String] = Map())
+
+/** :: DeveloperApi ::
+ * Thrown when a StorageClient runs into an exceptional condition
+ *
+ * @param message Exception error message
+ * @param cause The underlying exception that caused the exception
+ * @group Storage System
+ */
+@DeveloperApi
+class StorageClientException(message: String, cause: Throwable)
+ extends RuntimeException(message, cause)
+
+@deprecated("Use StorageException", "0.9.2")
+private[prediction] case class StorageError(message: String)
+
+/** :: DeveloperApi ::
+ * Thrown by data access objects when they run into exceptional conditions
+ *
+ * @param message Exception error message
+ * @param cause The underlying exception that caused the exception
+ *
+ * @group Storage System
+ */
+@DeveloperApi
+class StorageException(message: String, cause: Throwable)
+ extends Exception(message, cause) {
+
+ def this(message: String) = this(message, null)
+}
+
+/** Backend-agnostic data storage layer with lazy initialization. Use this
+ * object when you need to interface with Event Store in your engine.
+ *
+ * @group Storage System
+ */
+object Storage extends Logging {
+ private case class ClientMeta(
+ sourceType: String,
+ client: BaseStorageClient,
+ config: StorageClientConfig)
+
+ private case class DataObjectMeta(sourceName: String, namespace: String)
+
+ private var errors = 0
+
+ private val sourcesPrefix = "PIO_STORAGE_SOURCES"
+
+ private val sourceTypesRegex = """PIO_STORAGE_SOURCES_([^_]+)_TYPE""".r
+
+ private val sourceKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
+ sourceTypesRegex findFirstIn k match {
+ case Some(sourceTypesRegex(sourceType)) => Seq(sourceType)
+ case None => Nil
+ }
+ }
+
+ if (sourceKeys.size == 0) warn("There is no properly configured data source.")
+
+ private val s2cm = scala.collection.mutable.Map[String, Option[ClientMeta]]()
+
+ /** Reference to the app data repository. */
+ private val EventDataRepository = "EVENTDATA"
+ private val ModelDataRepository = "MODELDATA"
+ private val MetaDataRepository = "METADATA"
+
+ private val repositoriesPrefix = "PIO_STORAGE_REPOSITORIES"
+
+ private val repositoryNamesRegex =
+ """PIO_STORAGE_REPOSITORIES_([^_]+)_NAME""".r
+
+ private val repositoryKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
+ repositoryNamesRegex findFirstIn k match {
+ case Some(repositoryNamesRegex(repositoryName)) => Seq(repositoryName)
+ case None => Nil
+ }
+ }
+
+ if (repositoryKeys.size == 0) {
+ warn("There is no properly configured repository.")
+ }
+
+ private val requiredRepositories = Seq(MetaDataRepository)
+
+ requiredRepositories foreach { r =>
+ if (!repositoryKeys.contains(r)) {
+ error(s"Required repository (${r}) configuration is missing.")
+ errors += 1
+ }
+ }
+ private val repositoriesToDataObjectMeta: Map[String, DataObjectMeta] =
+ repositoryKeys.map(r =>
+ try {
+ val keyedPath = repositoriesPrefixPath(r)
+ val name = sys.env(prefixPath(keyedPath, "NAME"))
+ val sourceName = sys.env(prefixPath(keyedPath, "SOURCE"))
+ if (sourceKeys.contains(sourceName)) {
+ r -> DataObjectMeta(
+ sourceName = sourceName,
+ namespace = name)
+ } else {
+ error(s"$sourceName is not a configured storage source.")
+ r -> DataObjectMeta("", "")
+ }
+ } catch {
+ case e: Throwable =>
+ error(e.getMessage)
+ errors += 1
+ r -> DataObjectMeta("", "")
+ }
+ ).toMap
+
+ if (errors > 0) {
+ error(s"There were $errors configuration errors. Exiting.")
+ sys.exit(errors)
+ }
+
+ // End of constructor and field definitions and begin method definitions
+
+ private def prefixPath(prefix: String, body: String) = s"${prefix}_$body"
+
+ private def sourcesPrefixPath(body: String) = prefixPath(sourcesPrefix, body)
+
+ private def repositoriesPrefixPath(body: String) =
+ prefixPath(repositoriesPrefix, body)
+
+ private def sourcesToClientMeta(
+ source: String,
+ parallel: Boolean,
+ test: Boolean): Option[ClientMeta] = {
+ val sourceName = if (parallel) s"parallel-$source" else source
+ s2cm.getOrElseUpdate(sourceName, updateS2CM(source, parallel, test))
+ }
+
+ private def getClient(
+ clientConfig: StorageClientConfig,
+ pkg: String): BaseStorageClient = {
+ val className = "org.apache.predictionio.data.storage." + pkg + ".StorageClient"
+ try {
+ Class.forName(className).getConstructors()(0).newInstance(clientConfig).
+ asInstanceOf[BaseStorageClient]
+ } catch {
+ case e: ClassNotFoundException =>
+ val originalClassName = pkg + ".StorageClient"
+ Class.forName(originalClassName).getConstructors()(0).
+ newInstance(clientConfig).asInstanceOf[BaseStorageClient]
+ case e: java.lang.reflect.InvocationTargetException =>
+ throw e.getCause
+ }
+ }
+
+ /** Get the StorageClient config data from PIO Framework's environment variables */
+ def getConfig(sourceName: String): Option[StorageClientConfig] = {
+ if (s2cm.contains(sourceName) && s2cm.get(sourceName).nonEmpty
+ && s2cm.get(sourceName).get.nonEmpty) {
+ Some(s2cm.get(sourceName).get.get.config)
+ } else None
+ }
+
+ private def updateS2CM(k: String, parallel: Boolean, test: Boolean):
+ Option[ClientMeta] = {
+ try {
+ val keyedPath = sourcesPrefixPath(k)
+ val sourceType = sys.env(prefixPath(keyedPath, "TYPE"))
+ val props = sys.env.filter(t => t._1.startsWith(keyedPath)).map(
+ t => t._1.replace(s"${keyedPath}_", "") -> t._2)
+ val clientConfig = StorageClientConfig(
+ properties = props,
+ parallel = parallel,
+ test = test)
+ val client = getClient(clientConfig, sourceType)
+ Some(ClientMeta(sourceType, client, clientConfig))
+ } catch {
+ case e: Throwable =>
+ error(s"Error initializing storage client for source ${k}", e)
+ errors += 1
+ None
+ }
+ }
+
+ private[prediction]
+ def getDataObjectFromRepo[T](repo: String, test: Boolean = false)
+ (implicit tag: TypeTag[T]): T = {
+ val repoDOMeta = repositoriesToDataObjectMeta(repo)
+ val repoDOSourceName = repoDOMeta.sourceName
+ getDataObject[T](repoDOSourceName, repoDOMeta.namespace, test = test)
+ }
+
+ private[prediction]
+ def getPDataObject[T](repo: String)(implicit tag: TypeTag[T]): T = {
+ val repoDOMeta = repositoriesToDataObjectMeta(repo)
+ val repoDOSourceName = repoDOMeta.sourceName
+ getPDataObject[T](repoDOSourceName, repoDOMeta.namespace)
+ }
+
+ private[prediction] def getDataObject[T](
+ sourceName: String,
+ namespace: String,
+ parallel: Boolean = false,
+ test: Boolean = false)(implicit tag: TypeTag[T]): T = {
+ val clientMeta = sourcesToClientMeta(sourceName, parallel, test) getOrElse {
+ throw new StorageClientException(
+ s"Data source $sourceName was not properly initialized.", null)
+ }
+ val sourceType = clientMeta.sourceType
+ val ctorArgs = dataObjectCtorArgs(clientMeta.client, namespace)
+ val classPrefix = clientMeta.client.prefix
+ val originalClassName = tag.tpe.toString.split('.')
+ val rawClassName = sourceType + "." + classPrefix + originalClassName.last
+ val className = "org.apache.predictionio.data.storage." + rawClassName
+ val clazz = try {
+ Class.forName(className)
+ } catch {
+ case e: ClassNotFoundException =>
+ try {
+ Class.forName(rawClassName)
+ } catch {
+ case e: ClassNotFoundException =>
+ throw new StorageClientException("No storage backend " +
+ "implementation can be found (tried both " +
+ s"$className and $rawClassName)", e)
+ }
+ }
+ val constructor = clazz.getConstructors()(0)
+ try {
+ constructor.newInstance(ctorArgs: _*).
+ asInstanceOf[T]
+ } catch {
+ case e: IllegalArgumentException =>
+ error(
+ "Unable to instantiate data object with class '" +
+ constructor.getDeclaringClass.getName + " because its constructor" +
+ " does not have the right number of arguments." +
+ " Number of required constructor arguments: " +
+ ctorArgs.size + "." +
+ " Number of existing constructor arguments: " +
+ constructor.getParameterTypes.size + "." +
+ s" Storage source name: ${sourceName}." +
+ s" Exception message: ${e.getMessage}).", e)
+ errors += 1
+ throw e
+ case e: java.lang.reflect.InvocationTargetException =>
+ throw e.getCause
+ }
+ }
+
+ private def getPDataObject[T](
+ sourceName: String,
+ databaseName: String)(implicit tag: TypeTag[T]): T =
+ getDataObject[T](sourceName, databaseName, true)
+
+ private def dataObjectCtorArgs(
+ client: BaseStorageClient,
+ namespace: String): Seq[AnyRef] = {
+ Seq(client.client, client.config, namespace)
+ }
+
+ private[prediction] def verifyAllDataObjects(): Unit = {
+ info("Verifying Meta Data Backend (Source: " +
+ s"${repositoriesToDataObjectMeta(MetaDataRepository).sourceName})...")
+ getMetaDataEngineManifests()
+ getMetaDataEngineInstances()
+ getMetaDataEvaluationInstances()
+ getMetaDataApps()
+ getMetaDataAccessKeys()
+ info("Verifying Model Data Backend (Source: " +
+ s"${repositoriesToDataObjectMeta(ModelDataRepository).sourceName})...")
+ getModelDataModels()
+ info("Verifying Event Data Backend (Source: " +
+ s"${repositoriesToDataObjectMeta(EventDataRepository).sourceName})...")
+ val eventsDb = getLEvents(test = true)
+ info("Test writing to Event Store (App Id 0)...")
+ // use appId=0 for testing purpose
+ eventsDb.init(0)
+ eventsDb.insert(Event(
+ event = "test",
+ entityType = "test",
+ entityId = "test"), 0)
+ eventsDb.remove(0)
+ eventsDb.close()
+ }
+
+ private[prediction] def getMetaDataEngineManifests(): EngineManifests =
+ getDataObjectFromRepo[EngineManifests](MetaDataRepository)
+
+ private[prediction] def getMetaDataEngineInstances(): EngineInstances =
+ getDataObjectFromRepo[EngineInstances](MetaDataRepository)
+
+ private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances =
+ getDataObjectFromRepo[EvaluationInstances](MetaDataRepository)
+
+ private[prediction] def getMetaDataApps(): Apps =
+ getDataObjectFromRepo[Apps](MetaDataRepository)
+
+ private[prediction] def getMetaDataAccessKeys(): AccessKeys =
+ getDataObjectFromRepo[AccessKeys](MetaDataRepository)
+
+ private[prediction] def getMetaDataChannels(): Channels =
+ getDataObjectFromRepo[Channels](MetaDataRepository)
+
+ private[prediction] def getModelDataModels(): Models =
+ getDataObjectFromRepo[Models](ModelDataRepository)
+
+ /** Obtains a data access object that returns [[Event]] related local data
+ * structure.
+ */
+ def getLEvents(test: Boolean = false): LEvents =
+ getDataObjectFromRepo[LEvents](EventDataRepository, test = test)
+
+ /** Obtains a data access object that returns [[Event]] related RDD data
+ * structure.
+ */
+ def getPEvents(): PEvents =
+ getPDataObject[PEvents](EventDataRepository)
+
+ def config: Map[String, Map[String, Map[String, String]]] = Map(
+ "sources" -> s2cm.toMap.map { case (source, clientMeta) =>
+ source -> clientMeta.map { cm =>
+ Map(
+ "type" -> cm.sourceType,
+ "config" -> cm.config.properties.map(t => s"${t._1} -> ${t._2}").mkString(", ")
+ )
+ }.getOrElse(Map.empty)
+ }
+ )
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala
new file mode 100644
index 0000000..321b245
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala
@@ -0,0 +1,47 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.joda.time.DateTime
+import org.joda.time.format.ISODateTimeFormat
+
+/** Backend-agnostic storage utilities. */
+private[prediction] object Utils {
+ /**
+ * Add prefix to custom attribute keys.
+ */
+ def addPrefixToAttributeKeys[T](
+ attributes: Map[String, T],
+ prefix: String = "ca_"): Map[String, T] = {
+ attributes map { case (k, v) => (prefix + k, v) }
+ }
+
+ /** Remove prefix from custom attribute keys. */
+ def removePrefixFromAttributeKeys[T](
+ attributes: Map[String, T],
+ prefix: String = "ca_"): Map[String, T] = {
+ attributes map { case (k, v) => (k.stripPrefix(prefix), v) }
+ }
+
+ /**
+ * Appends App ID to any ID.
+ * Used for distinguishing different app's data within a single collection.
+ */
+ def idWithAppid(appid: Int, id: String): String = appid + "_" + id
+
+ def stringToDateTime(dt: String): DateTime =
+ ISODateTimeFormat.dateTimeParser.parseDateTime(dt)
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
new file mode 100644
index 0000000..7853d97
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -0,0 +1,116 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+import scala.util.Random
+
+/** Elasticsearch implementation of AccessKeys. */
+class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
+ extends AccessKeys with Logging {
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "accesskeys"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(accessKey: AccessKey): Option[String] = {
+ val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+ update(accessKey.copy(key = key))
+ Some(key)
+ }
+
+ def get(key: String): Option[AccessKey] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ key).get()
+ Some(read[AccessKey](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getAll(): Seq[AccessKey] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[AccessKey](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[AccessKey]()
+ }
+ }
+
+ def getByAppid(appid: Int): Seq[AccessKey] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("appid", appid))
+ ESUtils.getAll[AccessKey](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[AccessKey]()
+ }
+ }
+
+ def update(accessKey: AccessKey): Unit = {
+ try {
+ client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+ def delete(key: String): Unit = {
+ try {
+ client.prepareDelete(index, estype, key).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
new file mode 100644
index 0000000..6790b52
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -0,0 +1,127 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+/** Elasticsearch implementation of Items. */
+class ESApps(client: Client, config: StorageClientConfig, index: String)
+ extends Apps with Logging {
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "apps"
+ private val seq = new ESSequences(client, config, index)
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(app: App): Option[Int] = {
+ val id =
+ if (app.id == 0) {
+ var roll = seq.genNext("apps")
+ while (!get(roll).isEmpty) roll = seq.genNext("apps")
+ roll
+ }
+ else app.id
+ val realapp = app.copy(id = id)
+ update(realapp)
+ Some(id)
+ }
+
+ def get(id: Int): Option[App] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ id.toString).get()
+ Some(read[App](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getByName(name: String): Option[App] = {
+ try {
+ val response = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("name", name)).get
+ val hits = response.getHits().hits()
+ if (hits.size > 0) {
+ Some(read[App](hits.head.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[App] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[App](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[App]()
+ }
+ }
+
+ def update(app: App): Unit = {
+ try {
+ val response = client.prepareIndex(index, estype, app.id.toString).
+ setSource(write(app)).get()
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+ def delete(id: Int): Unit = {
+ try {
+ client.prepareDelete(index, estype, id.toString).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+}