You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by sh...@apache.org on 2017/06/12 13:29:27 UTC
incubator-predictionio git commit: [PIO-90] Improve
/batch/events.json endpoint performance
Repository: incubator-predictionio
Updated Branches:
refs/heads/develop a36fbacae -> 4b172f57e
[PIO-90] Improve /batch/events.json endpoint performance
Closes #386
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/4b172f57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/4b172f57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/4b172f57
Branch: refs/heads/develop
Commit: 4b172f57e03d2fe022a9d21100b233b1703d7758
Parents: a36fbac
Author: Naoki Takezoe <ta...@apache.org>
Authored: Mon Jun 12 22:27:57 2017 +0900
Committer: Shinsuke Sugaya <sh...@apache.org>
Committed: Mon Jun 12 22:27:57 2017 +0900
----------------------------------------------------------------------
.../predictionio/data/api/EventServer.scala | 96 ++++++++++++--------
.../predictionio/data/storage/LEvents.scala | 21 +++++
.../data/storage/elasticsearch/ESLEvents.scala | 76 ++++++++++++++--
.../data/storage/hbase/HBLEvents.scala | 14 +++
.../data/storage/jdbc/JDBCLEvents.scala | 59 ++++++++++--
5 files changed, 215 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
index b4392ff..75c2227 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -384,50 +384,70 @@ class EventServiceActor(
val appId = authData.appId
val channelId = authData.channelId
val allowedEvents = authData.events
- val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
- case Success(event) => {
- if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
- pluginContext.inputBlockers.values.foreach(
- _.process(EventInfo(
- appId = appId,
- channelId = channelId,
- event = event), pluginContext))
- val data = eventClient.futureInsert(event, appId, channelId).map { id =>
- pluginsActorRef ! EventInfo(
- appId = appId,
- channelId = channelId,
- event = event)
- val status = StatusCodes.Created
- val result = Map(
- "status" -> status.intValue,
- "eventId" -> s"${id}")
- if (config.stats) {
- statsActorRef ! Bookkeeping(appId, status, event)
+
+ entity(as[Seq[Try[Event]]]) { events =>
+ complete {
+ if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+ val eventWithIndex = events.zipWithIndex
+
+ val taggedEvents = eventWithIndex.collect { case (Success(event), i) =>
+ if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){
+ (Right(event), i)
+ } else {
+ (Left(event), i)
}
- result
- }.recover { case exception =>
+ }
+
+ val insertEvents = taggedEvents.collect { case (Right(event), i) =>
+ (event, i)
+ }
+
+ insertEvents.foreach { case (event, i) =>
+ pluginContext.inputBlockers.values.foreach(
+ _.process(EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event), pluginContext))
+ }
+
+ val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch(
+ insertEvents.map(_._1), appId, channelId).map { insertResults =>
+ val results = insertResults.zip(insertEvents).map { case (id, (event, i)) =>
+ pluginsActorRef ! EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event)
+ val status = StatusCodes.Created
+ if (config.stats) {
+ statsActorRef ! Bookkeeping(appId, status, event)
+ }
+ (Map(
+ "status" -> status.intValue,
+ "eventId" -> s"${id}"), i)
+ } ++
+ // Results of denied events
+ taggedEvents.collect { case (Left(event), i) =>
+ (Map(
+ "status" -> StatusCodes.Forbidden.intValue,
+ "message" -> s"${event.event} events are not allowed"), i)
+ } ++
+ // Results of failed to deserialze events
+ eventWithIndex.collect { case (Failure(exception), i) =>
+ (Map(
+ "status" -> StatusCodes.BadRequest.intValue,
+ "message" -> s"${exception.getMessage()}"), i)
+ }
+
+ // Restore original order
+ results.sortBy { case (_, i) => i }.map { case (data, _) => data }
+ }
+
+ f.recover { case exception =>
Map(
"status" -> StatusCodes.InternalServerError.intValue,
"message" -> s"${exception.getMessage()}")
}
- data
- } else {
- Future.successful(Map(
- "status" -> StatusCodes.Forbidden.intValue,
- "message" -> s"${event.event} events are not allowed"))
- }
- }
- case Failure(exception) => {
- Future.successful(Map(
- "status" -> StatusCodes.BadRequest.intValue,
- "message" -> s"${exception.getMessage()}"))
- }
- }
- entity(as[Seq[Try[Event]]]) { events =>
- complete {
- if (events.length <= MaxNumberOfEventsPerBatchRequest) {
- Future.traverse(events)(handleEvent)
} else {
(StatusCodes.BadRequest,
Map("message" -> (s"Batch request must have less than or equal to " +
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
index f7a980c..65b8779 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
@@ -91,6 +91,27 @@ trait LEvents {
event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String]
/** :: DeveloperApi ::
+ * Insert [[Event]]s in a non-blocking fashion.
+ *
+ * Default implementation of this method is calling
+ * [[LEvents.futureInsert(Event, Int, Option[Int])]] per event.
+ * Override in the storage implementation if the storage has
+ * a better way to insert multiple data at once.
+ *
+ * @param events [[Event]]s to be inserted
+ * @param appId App ID for the [[Event]]s to be inserted to
+ * @param channelId Optional channel ID for the [[Event]]s to be inserted to
+ */
+ @DeveloperApi
+ def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])
+ (implicit ec: ExecutionContext): Future[Seq[String]] = {
+ val seq = events.map { event =>
+ futureInsert(event, appId, channelId)
+ }
+ Future.sequence(seq)
+ }
+
+ /** :: DeveloperApi ::
* Get an [[Event]] in a non-blocking fashion.
*
* @param eventId ID of the [[Event]]
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 6cf7a9a..6240059 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -22,8 +22,7 @@ import java.io.IOException
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-
-import org.apache.http.entity.ContentType
+import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.Event
@@ -34,13 +33,11 @@ import org.joda.time.DateTime
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
import org.json4s.ext.JodaTimeSerializers
-
import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
-import org.apache.http.entity.StringEntity
+import org.apache.http.message.BasicHeader
class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
extends LEvents with Logging {
@@ -130,7 +127,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("prId" -> event.prId) ~
("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
("properties" -> write(event.properties.toJObject))
- val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON);
+ val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
@@ -153,6 +150,73 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
}
}
+ override def futureInsertBatch(
+ events: Seq[Event],
+ appId: Int,
+ channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = {
+ Future {
+ val estype = getEsType(appId, channelId)
+ try {
+ val ids = events.map { event =>
+ event.eventId.getOrElse(ESEventsUtil.getBase64UUID)
+ }
+
+ val json = events.zip(ids).map { case (event, id) =>
+ val commandJson =
+ ("index" -> (
+ ("_index" -> index) ~
+ ("_type" -> estype) ~
+ ("_id" -> id)
+ ))
+
+ val documentJson =
+ ("eventId" -> id) ~
+ ("event" -> event.event) ~
+ ("entityType" -> event.entityType) ~
+ ("entityId" -> event.entityId) ~
+ ("targetEntityType" -> event.targetEntityType) ~
+ ("targetEntityId" -> event.targetEntityId) ~
+ ("eventTime" -> ESUtils.formatUTCDateTime(event.eventTime)) ~
+ ("tags" -> event.tags) ~
+ ("prId" -> event.prId) ~
+ ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
+ ("properties" -> write(event.properties.toJObject))
+
+ compact(render(commandJson)) + "\n" + compact(render(documentJson))
+
+ }.mkString("", "\n", "\n")
+
+ val entity = new StringEntity(json)
+ val response = restClient.performRequest(
+ "POST",
+ "/_bulk",
+ Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
+ entity,
+ new BasicHeader("Content-Type", "application/x-ndjson"))
+
+ val responseJson = parse(EntityUtils.toString(response.getEntity))
+ val items = (responseJson \ "items").asInstanceOf[JArray]
+
+ items.arr.map { case value: JObject =>
+ val result = (value \ "index" \ "result").extract[String]
+ val id = (value \ "index" \ "_id").extract[String]
+
+ result match {
+ case "created" => id
+ case "updated" => id
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ ""
+ }
+ }
+ } catch {
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/<id>", e)
+ Nil
+ }
+ }
+ }
+
private def exists(restClient: RestClient, estype: String, id: Int): Boolean = {
try {
restClient.performRequest(
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
index 360b007..e95e7e8 100644
--- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
@@ -110,6 +110,20 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace
}
override
+ def futureInsertBatch(
+ events: Seq[Event], appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+ Future[Seq[String]] = {
+ Future {
+ val table = getTable(appId, channelId)
+ val (puts, rowKeys) = events.map { event => HBEventsUtil.eventToPut(event, appId) }.unzip
+ table.put(puts)
+ table.flushCommits()
+ table.close()
+ rowKeys.map(_.toString)
+ }
+ }
+
+ override
def futureGet(
eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
Future[Option[Event]] = {
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
index dddef67..b4230cc 100644
--- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
@@ -40,7 +40,7 @@ class JDBCLEvents(
namespace: String) extends LEvents with Logging {
implicit private val formats = org.json4s.DefaultFormats
- def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+ override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
// To use index, it must be varchar less than 255 characters on a VARCHAR column
val useIndex = config.properties.contains("INDEX") &&
@@ -91,7 +91,7 @@ class JDBCLEvents(
}
}
- def remove(appId: Int, channelId: Option[Int] = None): Boolean =
+ override def remove(appId: Int, channelId: Option[Int] = None): Boolean =
DB autoCommit { implicit session =>
SQL(s"""
drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
@@ -99,9 +99,9 @@ class JDBCLEvents(
true
}
- def close(): Unit = ConnectionPool.closeAll()
+ override def close(): Unit = ConnectionPool.closeAll()
- def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
+ override def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[String] = Future {
DB localTx { implicit session =>
val id = event.eventId.getOrElse(JDBCUtils.generateId)
@@ -127,7 +127,52 @@ class JDBCLEvents(
}
}
- def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
+ override def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])(
+ implicit ec: ExecutionContext): Future[Seq[String]] = Future {
+ DB localTx { implicit session =>
+ val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId))
+ val params = events.zip(ids).map { case (event, id) =>
+ Seq(
+ 'id -> id,
+ 'event -> event.event,
+ 'entityType -> event.entityType,
+ 'entityId -> event.entityId,
+ 'targetEntityType -> event.targetEntityType,
+ 'targetEntityId -> event.targetEntityId,
+ 'properties -> write(event.properties.toJObject),
+ 'eventTime -> event.eventTime,
+ 'eventTimeZone -> event.eventTime.getZone.getID,
+ 'tags -> (if(event.tags.nonEmpty) Some(event.tags.mkString(",")) else None),
+ 'prId -> event.prId,
+ 'creationTime -> event.creationTime,
+ 'creationTimeZone -> event.creationTime.getZone.getID
+ )
+ }
+
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ sql"""
+ insert into $tableName values(
+ {id},
+ {event},
+ {entityType},
+ {entityId},
+ {targetEntityType},
+ {targetEntityId},
+ {properties},
+ {eventTime},
+ {eventTimeZone},
+ {tags},
+ {prId},
+ {creationTime},
+ {creationTimeZone}
+ )
+ """.batchByName(params: _*).apply()
+
+ ids
+ }
+ }
+
+ override def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Option[Event]] = Future {
DB readOnly { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
@@ -152,7 +197,7 @@ class JDBCLEvents(
}
}
- def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
+ override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Boolean] = Future {
DB localTx { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
@@ -163,7 +208,7 @@ class JDBCLEvents(
}
}
- def futureFind(
+ override def futureFind(
appId: Int,
channelId: Option[Int] = None,
startTime: Option[DateTime] = None,