You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/14 22:09:23 UTC

incubator-predictionio git commit: Update event serialization for ES5

Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop 463939348 -> ae51040ba


Update event serialization for ES5

Closes #358


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/ae51040b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/ae51040b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/ae51040b

Branch: refs/heads/develop
Commit: ae51040baaebd30608198841b7d4caee2c4c5cd0
Parents: 4639393
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Tue Mar 14 15:08:33 2017 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Tue Mar 14 15:08:33 2017 -0700

----------------------------------------------------------------------
 .../storage/elasticsearch/ESEventsUtil.scala    |  44 ++------
 .../data/storage/elasticsearch/ESLEvents.scala  |  35 ++++---
 .../data/storage/elasticsearch/ESUtils.scala    | 103 +++++++++++++++++--
 3 files changed, 121 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
index 56f47ab..2edbc35 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -18,16 +18,14 @@
 
 package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.MapWritable
 import org.apache.hadoop.io.Text
 import org.apache.predictionio.data.storage.DataMap
 import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventValidation
 import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
 import org.json4s._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
 
 object ESEventsUtil {
 
@@ -53,23 +51,8 @@ object ESEventsUtil {
       }
     }
 
-    val tmp = result
-      .get(new Text("properties")).asInstanceOf[MapWritable]
-      .get(new Text("fields")).asInstanceOf[MapWritable]
-      .get(new Text("rating"))
-
-    val rating =
-      if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable]
-      else if (tmp.isInstanceOf[LongWritable]) {
-        new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble)
-      }
-      else null
-
-    val properties: DataMap =
-      if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""")
-      else DataMap()
-
-
+    val properties: DataMap = getOptStringCol("properties")
+      .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
     val eventId = Some(getStringCol("eventId"))
     val event = getStringCol("event")
     val entityType = getStringCol("entityType")
@@ -77,17 +60,8 @@ object ESEventsUtil {
     val targetEntityType = getOptStringCol("targetEntityType")
     val targetEntityId = getOptStringCol("targetEntityId")
     val prId = getOptStringCol("prId")
-    val eventTimeZone = getOptStringCol("eventTimeZone")
-      .map(DateTimeZone.forID(_))
-      .getOrElse(EventValidation.defaultTimeZone)
-    val eventTime = new DateTime(
-      getStringCol("eventTime"), eventTimeZone)
-    val creationTimeZone = getOptStringCol("creationTimeZone")
-      .map(DateTimeZone.forID(_))
-      .getOrElse(EventValidation.defaultTimeZone)
-    val creationTime: DateTime = new DateTime(
-      getStringCol("creationTime"), creationTimeZone)
-
+    val eventTime: DateTime = ESUtils.parseUTCDateTime(getStringCol("eventTime"))
+    val creationTime: DateTime = ESUtils.parseUTCDateTime(getStringCol("creationTime"))
 
     Event(
       eventId = eventId,
@@ -112,11 +86,11 @@ object ESEventsUtil {
       "entityId" -> event.entityId,
       "targetEntityType" -> event.targetEntityType,
       "targetEntityId" -> event.targetEntityId,
-      "properties" -> event.properties.toJObject,
-      "eventTime" -> event.eventTime.toString,
+      "properties" -> write(event.properties.toJObject),
+      "eventTime" -> ESUtils.formatUTCDateTime(event.eventTime),
       "tags" -> event.tags,
       "prId" -> event.prId,
-      "creationTime" -> event.creationTime.toString
+      "creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index fdd370a..809a064 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -72,17 +72,11 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
             ("entityId" -> ("type" -> "keyword")) ~
             ("targetEntityType" -> ("type" -> "keyword")) ~
             ("targetEntityId" -> ("type" -> "keyword")) ~
-            ("properties" ->
-              ("type" -> "nested") ~
-              ("properties" ->
-                ("fields" -> ("type" -> "nested") ~
-                  ("properties" ->
-                    ("user" -> ("type" -> "long")) ~
-                    ("num" -> ("type" -> "long")))))) ~
-                    ("eventTime" -> ("type" -> "date")) ~
-                    ("tags" -> ("type" -> "keyword")) ~
-                    ("prId" -> ("type" -> "keyword")) ~
-                    ("creationTime" -> ("type" -> "date"))))
+            ("properties" -> ("type" -> "keyword")) ~
+            ("eventTime" -> ("type" -> "date")) ~
+            ("tags" -> ("type" -> "keyword")) ~
+            ("prId" -> ("type" -> "keyword")) ~
+            ("creationTime" -> ("type" -> "date"))))
       ESUtils.createMapping(restClient, index, estype, compact(render(json)))
     } finally {
       restClient.close()
@@ -134,8 +128,19 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
           while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
           roll.toString
         }
-        val json = write(event.copy(eventId = Some(id)))
-        val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
+        val json =
+          ("eventId" -> id) ~
+          ("event" -> event.event) ~
+          ("entityType" -> event.entityType) ~
+          ("entityId" -> event.entityId) ~
+          ("targetEntityType" -> event.targetEntityType) ~
+          ("targetEntityId" -> event.targetEntityId) ~
+          ("eventTime" -> ESUtils.formatUTCDateTime(event.eventTime)) ~
+          ("tags" -> event.tags) ~
+          ("prId" -> event.prId) ~
+          ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
+          ("properties" -> write(event.properties.toJObject))
+        val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON);
         val response = restClient.performRequest(
           "POST",
           s"/$index/$estype/$id",
@@ -275,8 +280,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
           startTime, untilTime, entityType, entityId,
           eventNames, targetEntityType, targetEntityId, reversed)
         limit.getOrElse(20) match {
-          case -1 => ESUtils.getAll[Event](restClient, index, estype, query).toIterator
-          case size => ESUtils.get[Event](restClient, index, estype, query, size).toIterator
+          case -1 => ESUtils.getEventAll(restClient, index, estype, query).toIterator
+          case size => ESUtils.getEvents(restClient, index, estype, query, size).toIterator
         }
       } catch {
         case e: IOException =>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 72f4dd6..4eb117e 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -34,17 +34,72 @@ import org.joda.time.format.DateTimeFormat
 import org.joda.time.DateTimeZone
 import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.http.HttpHost
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.DataMap
 
 object ESUtils {
   val scrollLife = "1m"
 
-  def get[T: Manifest](
+  def toEvent(value: JValue)(
+    implicit formats: Formats): Event = {
+    def getString(s: String): String = {
+      (value \ s) match {
+        case x if x == JNothing => null
+        case x => x.extract[String]
+      }
+    }
+
+    def getOptString(s: String): Option[String] = {
+      getString(s) match {
+        case null => None
+        case x => Some(x)
+      }
+    }
+
+    val properties: DataMap = getOptString("properties")
+      .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
+    val eventId = getOptString("eventId")
+    val event = getString("event")
+    val entityType = getString("entityType")
+    val entityId = getString("entityId")
+    val targetEntityType = getOptString("targetEntityType")
+    val targetEntityId = getOptString("targetEntityId")
+    val prId = getOptString("prId")
+    val eventTime: DateTime = ESUtils.parseUTCDateTime(getString("eventTime"))
+    val creationTime: DateTime = ESUtils.parseUTCDateTime(getString("creationTime"))
+    val tags = (value \ "tags").extract[Seq[String]]
+
+    Event(
+      eventId = eventId,
+      event = event,
+      entityType = entityType,
+      entityId = entityId,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      properties = properties,
+      eventTime = eventTime,
+      tags = tags,
+      prId = prId,
+      creationTime = creationTime)
+  }
+
+  def getEvents(
     client: RestClient,
     index: String,
     estype: String,
     query: String,
     size: Int)(
-      implicit formats: Formats): Seq[T] = {
+      implicit formats: Formats): Seq[Event] = {
+    getDocList(client, index, estype, query, size).map(x => toEvent(x))
+  }
+
+  def getDocList(
+    client: RestClient,
+    index: String,
+    estype: String,
+    query: String,
+    size: Int)(
+      implicit formats: Formats): Seq[JValue] = {
     val response = client.performRequest(
       "POST",
       s"/$index/$estype/_search",
@@ -52,7 +107,7 @@ object ESUtils {
       new StringEntity(query))
     val responseJValue = parse(EntityUtils.toString(response.getEntity))
     val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]]
-    hits.map(h => (h \ "_source").extract[T])
+    hits.map(h => (h \ "_source"))
   }
 
   def getAll[T: Manifest](
@@ -61,9 +116,27 @@ object ESUtils {
     estype: String,
     query: String)(
       implicit formats: Formats): Seq[T] = {
+    getDocAll(client, index, estype, query).map(x => x.extract[T])
+  }
+
+  def getEventAll(
+    client: RestClient,
+    index: String,
+    estype: String,
+    query: String)(
+      implicit formats: Formats): Seq[Event] = {
+    getDocAll(client, index, estype, query).map(x => toEvent(x))
+  }
+
+  def getDocAll(
+    client: RestClient,
+    index: String,
+    estype: String,
+    query: String)(
+      implicit formats: Formats): Seq[JValue] = {
 
     @scala.annotation.tailrec
-    def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = {
+    def scroll(scrollId: String, hits: Seq[JValue], results: Seq[JValue]): Seq[JValue] = {
       if (hits.isEmpty) results
       else {
         val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
@@ -76,7 +149,7 @@ object ESUtils {
         val responseJValue = parse(EntityUtils.toString(response.getEntity))
         scroll((responseJValue \ "_scroll_id").extract[String],
           (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
-          hits.map(h => (h \ "_source").extract[T]) ++ results)
+          hits.map(h => (h \ "_source").extract[JValue]) ++ results)
       }
     }
 
@@ -87,8 +160,8 @@ object ESUtils {
       new StringEntity(query))
     val responseJValue = parse(EntityUtils.toString(response.getEntity))
     scroll((responseJValue \ "_scroll_id").extract[String],
-        (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
-        Nil)
+      (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+      Nil)
   }
 
   def createIndex(
@@ -131,6 +204,16 @@ object ESUtils {
       }
   }
 
+  def formatUTCDateTime(dt: DateTime): String = {
+    DateTimeFormat
+      .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(dt.withZone(DateTimeZone.UTC))
+  }
+
+  def parseUTCDateTime(str: String): DateTime = {
+    DateTimeFormat
+      .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").parseDateTime(str)
+  }
+
   def createEventQuery(
     startTime: Option[DateTime] = None,
     untilTime: Option[DateTime] = None,
@@ -142,13 +225,11 @@ object ESUtils {
     reversed: Option[Boolean] = None): String = {
     val mustQueries = Seq(
       startTime.map(x => {
-        val v = DateTimeFormat
-          .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        val v = formatUTCDateTime(x)
         s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
       }),
       untilTime.map(x => {
-        val v = DateTimeFormat
-          .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        val v = formatUTCDateTime(x)
         s"""{"range":{"eventTime":{"lt":"${v}"}}}"""
       }),
       entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),