You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by sh...@apache.org on 2017/06/12 13:29:27 UTC

incubator-predictionio git commit: [PIO-90] Improve /batch/events.json endpoint performance

Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop a36fbacae -> 4b172f57e


[PIO-90] Improve /batch/events.json endpoint performance

Closes #386


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/4b172f57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/4b172f57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/4b172f57

Branch: refs/heads/develop
Commit: 4b172f57e03d2fe022a9d21100b233b1703d7758
Parents: a36fbac
Author: Naoki Takezoe <ta...@apache.org>
Authored: Mon Jun 12 22:27:57 2017 +0900
Committer: Shinsuke Sugaya <sh...@apache.org>
Committed: Mon Jun 12 22:27:57 2017 +0900

----------------------------------------------------------------------
 .../predictionio/data/api/EventServer.scala     | 96 ++++++++++++--------
 .../predictionio/data/storage/LEvents.scala     | 21 +++++
 .../data/storage/elasticsearch/ESLEvents.scala  | 76 ++++++++++++++--
 .../data/storage/hbase/HBLEvents.scala          | 14 +++
 .../data/storage/jdbc/JDBCLEvents.scala         | 59 ++++++++++--
 5 files changed, 215 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
index b4392ff..75c2227 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -384,50 +384,70 @@ class  EventServiceActor(
               val appId = authData.appId
               val channelId = authData.channelId
               val allowedEvents = authData.events
-              val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
-                case Success(event) => {
-                  if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
-                    pluginContext.inputBlockers.values.foreach(
-                      _.process(EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event), pluginContext))
-                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
-                      pluginsActorRef ! EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event)
-                      val status = StatusCodes.Created
-                      val result = Map(
-                        "status" -> status.intValue,
-                        "eventId" -> s"${id}")
-                      if (config.stats) {
-                        statsActorRef ! Bookkeeping(appId, status, event)
+
+              entity(as[Seq[Try[Event]]]) { events =>
+                complete {
+                  if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+                    val eventWithIndex = events.zipWithIndex
+
+                    val taggedEvents = eventWithIndex.collect { case (Success(event), i) =>
+                      if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){
+                        (Right(event), i)
+                      } else {
+                        (Left(event), i)
                       }
-                      result
-                    }.recover { case exception =>
+                    }
+
+                    val insertEvents = taggedEvents.collect { case (Right(event), i) =>
+                      (event, i)
+                    }
+
+                    insertEvents.foreach { case (event, i) =>
+                      pluginContext.inputBlockers.values.foreach(
+                        _.process(EventInfo(
+                          appId = appId,
+                          channelId = channelId,
+                          event = event), pluginContext))
+                    }
+
+                    val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch(
+                      insertEvents.map(_._1), appId, channelId).map { insertResults =>
+                      val results = insertResults.zip(insertEvents).map { case (id, (event, i)) =>
+                        pluginsActorRef ! EventInfo(
+                          appId = appId,
+                          channelId = channelId,
+                          event = event)
+                        val status = StatusCodes.Created
+                        if (config.stats) {
+                          statsActorRef ! Bookkeeping(appId, status, event)
+                        }
+                        (Map(
+                          "status" -> status.intValue,
+                          "eventId" -> s"${id}"), i)
+                      } ++
+                        // Results of denied events
+                        taggedEvents.collect { case (Left(event), i) =>
+                          (Map(
+                            "status" -> StatusCodes.Forbidden.intValue,
+                            "message" -> s"${event.event} events are not allowed"), i)
+                        } ++
+                        // Results of failed to deserialze events
+                        eventWithIndex.collect { case (Failure(exception), i) =>
+                          (Map(
+                            "status" -> StatusCodes.BadRequest.intValue,
+                            "message" -> s"${exception.getMessage()}"), i)
+                        }
+
+                      // Restore original order
+                      results.sortBy { case (_, i) => i }.map { case (data, _) => data }
+                    }
+
+                    f.recover { case exception =>
                       Map(
                         "status" -> StatusCodes.InternalServerError.intValue,
                         "message" -> s"${exception.getMessage()}")
                     }
-                    data
-                  } else {
-                    Future.successful(Map(
-                      "status" -> StatusCodes.Forbidden.intValue,
-                      "message" -> s"${event.event} events are not allowed"))
-                  }
-                }
-                case Failure(exception) => {
-                  Future.successful(Map(
-                    "status" -> StatusCodes.BadRequest.intValue,
-                    "message" -> s"${exception.getMessage()}"))
-                }
-              }
 
-              entity(as[Seq[Try[Event]]]) { events =>
-                complete {
-                  if (events.length <= MaxNumberOfEventsPerBatchRequest) {
-                    Future.traverse(events)(handleEvent)
                   } else {
                     (StatusCodes.BadRequest,
                       Map("message" -> (s"Batch request must have less than or equal to " +

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
index f7a980c..65b8779 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala
@@ -91,6 +91,27 @@ trait LEvents {
     event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String]
 
   /** :: DeveloperApi ::
+    * Insert [[Event]]s in a non-blocking fashion.
+    *
+    * Default implementation of this method is calling
+    * [[LEvents.futureInsert(Event, Int, Option[Int])]] per event.
+    * Override in the storage implementation if the storage has
+    * a better way to insert multiple data at once.
+    *
+    * @param events [[Event]]s to be inserted
+    * @param appId App ID for the [[Event]]s to be inserted to
+    * @param channelId Optional channel ID for the [[Event]]s to be inserted to
+    */
+  @DeveloperApi
+  def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])
+    (implicit ec: ExecutionContext): Future[Seq[String]] = {
+    val seq = events.map { event =>
+      futureInsert(event, appId, channelId)
+    }
+    Future.sequence(seq)
+  }
+
+  /** :: DeveloperApi ::
     * Get an [[Event]] in a non-blocking fashion.
     *
     * @param eventId ID of the [[Event]]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 6cf7a9a..6240059 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -22,8 +22,7 @@ import java.io.IOException
 import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-
-import org.apache.http.entity.ContentType
+import org.apache.http.entity.{ContentType, StringEntity}
 import org.apache.http.nio.entity.NStringEntity
 import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.Event
@@ -34,13 +33,11 @@ import org.joda.time.DateTime
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 import org.json4s.ext.JodaTimeSerializers
-
 import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
-import org.apache.http.entity.StringEntity
+import org.apache.http.message.BasicHeader
 
 class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
     extends LEvents with Logging {
@@ -130,7 +127,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
           ("prId" -> event.prId) ~
           ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
           ("properties" -> write(event.properties.toJObject))
-        val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON);
+        val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
         val response = restClient.performRequest(
           "POST",
           s"/$index/$estype/$id",
@@ -153,6 +150,73 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     }
   }
 
+  override def futureInsertBatch(
+    events: Seq[Event],
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      try {
+        val ids = events.map { event =>
+          event.eventId.getOrElse(ESEventsUtil.getBase64UUID)
+        }
+
+        val json = events.zip(ids).map { case (event, id) =>
+          val commandJson =
+            ("index" -> (
+              ("_index" -> index) ~
+              ("_type" -> estype) ~
+              ("_id" -> id)
+            ))
+
+          val documentJson =
+            ("eventId" -> id) ~
+            ("event" -> event.event) ~
+            ("entityType" -> event.entityType) ~
+            ("entityId" -> event.entityId) ~
+            ("targetEntityType" -> event.targetEntityType) ~
+            ("targetEntityId" -> event.targetEntityId) ~
+            ("eventTime" -> ESUtils.formatUTCDateTime(event.eventTime)) ~
+            ("tags" -> event.tags) ~
+            ("prId" -> event.prId) ~
+            ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
+            ("properties" -> write(event.properties.toJObject))
+
+          compact(render(commandJson)) + "\n" + compact(render(documentJson))
+
+        }.mkString("", "\n", "\n")
+
+        val entity = new StringEntity(json)
+        val response = restClient.performRequest(
+          "POST",
+          "/_bulk",
+          Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
+          entity,
+          new BasicHeader("Content-Type", "application/x-ndjson"))
+
+        val responseJson = parse(EntityUtils.toString(response.getEntity))
+        val items = (responseJson \ "items").asInstanceOf[JArray]
+
+        items.arr.map { case value: JObject =>
+          val result = (value \ "index" \ "result").extract[String]
+          val id = (value \ "index" \ "_id").extract[String]
+
+          result match {
+            case "created" => id
+            case "updated" => id
+            case _ =>
+              error(s"[$result] Failed to update $index/$estype/$id")
+              ""
+          }
+        }
+      } catch {
+        case e: IOException =>
+          error(s"Failed to update $index/$estype/<id>", e)
+          Nil
+      }
+    }
+  }
+
   private def exists(restClient: RestClient, estype: String, id: Int): Boolean = {
     try {
       restClient.performRequest(

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
index 360b007..e95e7e8 100644
--- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
@@ -110,6 +110,20 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace
   }
 
   override
+  def futureInsertBatch(
+    events: Seq[Event], appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+    Future[Seq[String]] = {
+    Future {
+      val table = getTable(appId, channelId)
+      val (puts, rowKeys) = events.map { event => HBEventsUtil.eventToPut(event, appId) }.unzip
+      table.put(puts)
+      table.flushCommits()
+      table.close()
+      rowKeys.map(_.toString)
+    }
+  }
+
+  override
   def futureGet(
     eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
     Future[Option[Event]] = {

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
index dddef67..b4230cc 100644
--- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
@@ -40,7 +40,7 @@ class JDBCLEvents(
     namespace: String) extends LEvents with Logging {
   implicit private val formats = org.json4s.DefaultFormats
 
-  def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+  override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
 
     // To use index, it must be varchar less than 255 characters on a VARCHAR column
     val useIndex = config.properties.contains("INDEX") &&
@@ -91,7 +91,7 @@ class JDBCLEvents(
     }
   }
 
-  def remove(appId: Int, channelId: Option[Int] = None): Boolean =
+  override def remove(appId: Int, channelId: Option[Int] = None): Boolean =
     DB autoCommit { implicit session =>
       SQL(s"""
       drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
@@ -99,9 +99,9 @@ class JDBCLEvents(
       true
     }
 
-  def close(): Unit = ConnectionPool.closeAll()
+  override def close(): Unit = ConnectionPool.closeAll()
 
-  def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
+  override def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
     implicit ec: ExecutionContext): Future[String] = Future {
     DB localTx { implicit session =>
       val id = event.eventId.getOrElse(JDBCUtils.generateId)
@@ -127,7 +127,52 @@ class JDBCLEvents(
     }
   }
 
-  def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
+  override def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])(
+    implicit ec: ExecutionContext): Future[Seq[String]] = Future {
+    DB localTx { implicit session =>
+      val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId))
+      val params = events.zip(ids).map { case (event, id) =>
+        Seq(
+          'id               -> id,
+          'event            -> event.event,
+          'entityType       -> event.entityType,
+          'entityId         -> event.entityId,
+          'targetEntityType -> event.targetEntityType,
+          'targetEntityId   -> event.targetEntityId,
+          'properties       -> write(event.properties.toJObject),
+          'eventTime        -> event.eventTime,
+          'eventTimeZone    -> event.eventTime.getZone.getID,
+          'tags             -> (if(event.tags.nonEmpty) Some(event.tags.mkString(",")) else None),
+          'prId             -> event.prId,
+          'creationTime     -> event.creationTime,
+          'creationTimeZone -> event.creationTime.getZone.getID
+        )
+      }
+
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      sql"""
+      insert into $tableName values(
+        {id},
+        {event},
+        {entityType},
+        {entityId},
+        {targetEntityType},
+        {targetEntityId},
+        {properties},
+        {eventTime},
+        {eventTimeZone},
+        {tags},
+        {prId},
+        {creationTime},
+        {creationTimeZone}
+      )
+      """.batchByName(params: _*).apply()
+
+      ids
+    }
+  }
+
+  override def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
     implicit ec: ExecutionContext): Future[Option[Event]] = Future {
     DB readOnly { implicit session =>
       val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
@@ -152,7 +197,7 @@ class JDBCLEvents(
     }
   }
 
-  def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
+  override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
     implicit ec: ExecutionContext): Future[Boolean] = Future {
     DB localTx { implicit session =>
       val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
@@ -163,7 +208,7 @@ class JDBCLEvents(
     }
   }
 
-  def futureFind(
+  override def futureFind(
       appId: Int,
       channelId: Option[Int] = None,
       startTime: Option[DateTime] = None,