You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/14 22:09:23 UTC
incubator-predictionio git commit: Update event serialization for ES5
Repository: incubator-predictionio
Updated Branches:
refs/heads/develop 463939348 -> ae51040ba
Update event serialization for ES5
Closes #358
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/ae51040b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/ae51040b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/ae51040b
Branch: refs/heads/develop
Commit: ae51040baaebd30608198841b7d4caee2c4c5cd0
Parents: 4639393
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Tue Mar 14 15:08:33 2017 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Tue Mar 14 15:08:33 2017 -0700
----------------------------------------------------------------------
.../storage/elasticsearch/ESEventsUtil.scala | 44 ++------
.../data/storage/elasticsearch/ESLEvents.scala | 35 ++++---
.../data/storage/elasticsearch/ESUtils.scala | 103 +++++++++++++++++--
3 files changed, 121 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
index 56f47ab..2edbc35 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -18,16 +18,14 @@
package org.apache.predictionio.data.storage.elasticsearch
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.MapWritable
import org.apache.hadoop.io.Text
import org.apache.predictionio.data.storage.DataMap
import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventValidation
import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
import org.json4s._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
object ESEventsUtil {
@@ -53,23 +51,8 @@ object ESEventsUtil {
}
}
- val tmp = result
- .get(new Text("properties")).asInstanceOf[MapWritable]
- .get(new Text("fields")).asInstanceOf[MapWritable]
- .get(new Text("rating"))
-
- val rating =
- if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable]
- else if (tmp.isInstanceOf[LongWritable]) {
- new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble)
- }
- else null
-
- val properties: DataMap =
- if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""")
- else DataMap()
-
-
+ val properties: DataMap = getOptStringCol("properties")
+ .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
val eventId = Some(getStringCol("eventId"))
val event = getStringCol("event")
val entityType = getStringCol("entityType")
@@ -77,17 +60,8 @@ object ESEventsUtil {
val targetEntityType = getOptStringCol("targetEntityType")
val targetEntityId = getOptStringCol("targetEntityId")
val prId = getOptStringCol("prId")
- val eventTimeZone = getOptStringCol("eventTimeZone")
- .map(DateTimeZone.forID(_))
- .getOrElse(EventValidation.defaultTimeZone)
- val eventTime = new DateTime(
- getStringCol("eventTime"), eventTimeZone)
- val creationTimeZone = getOptStringCol("creationTimeZone")
- .map(DateTimeZone.forID(_))
- .getOrElse(EventValidation.defaultTimeZone)
- val creationTime: DateTime = new DateTime(
- getStringCol("creationTime"), creationTimeZone)
-
+ val eventTime: DateTime = ESUtils.parseUTCDateTime(getStringCol("eventTime"))
+ val creationTime: DateTime = ESUtils.parseUTCDateTime(getStringCol("creationTime"))
Event(
eventId = eventId,
@@ -112,11 +86,11 @@ object ESEventsUtil {
"entityId" -> event.entityId,
"targetEntityType" -> event.targetEntityType,
"targetEntityId" -> event.targetEntityId,
- "properties" -> event.properties.toJObject,
- "eventTime" -> event.eventTime.toString,
+ "properties" -> write(event.properties.toJObject),
+ "eventTime" -> ESUtils.formatUTCDateTime(event.eventTime),
"tags" -> event.tags,
"prId" -> event.prId,
- "creationTime" -> event.creationTime.toString
+ "creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)
)
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index fdd370a..809a064 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -72,17 +72,11 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("entityId" -> ("type" -> "keyword")) ~
("targetEntityType" -> ("type" -> "keyword")) ~
("targetEntityId" -> ("type" -> "keyword")) ~
- ("properties" ->
- ("type" -> "nested") ~
- ("properties" ->
- ("fields" -> ("type" -> "nested") ~
- ("properties" ->
- ("user" -> ("type" -> "long")) ~
- ("num" -> ("type" -> "long")))))) ~
- ("eventTime" -> ("type" -> "date")) ~
- ("tags" -> ("type" -> "keyword")) ~
- ("prId" -> ("type" -> "keyword")) ~
- ("creationTime" -> ("type" -> "date"))))
+ ("properties" -> ("type" -> "keyword")) ~
+ ("eventTime" -> ("type" -> "date")) ~
+ ("tags" -> ("type" -> "keyword")) ~
+ ("prId" -> ("type" -> "keyword")) ~
+ ("creationTime" -> ("type" -> "date"))))
ESUtils.createMapping(restClient, index, estype, compact(render(json)))
} finally {
restClient.close()
@@ -134,8 +128,19 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
roll.toString
}
- val json = write(event.copy(eventId = Some(id)))
- val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
+ val json =
+ ("eventId" -> id) ~
+ ("event" -> event.event) ~
+ ("entityType" -> event.entityType) ~
+ ("entityId" -> event.entityId) ~
+ ("targetEntityType" -> event.targetEntityType) ~
+ ("targetEntityId" -> event.targetEntityId) ~
+ ("eventTime" -> ESUtils.formatUTCDateTime(event.eventTime)) ~
+ ("tags" -> event.tags) ~
+ ("prId" -> event.prId) ~
+ ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
+ ("properties" -> write(event.properties.toJObject))
+ val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON);
val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
@@ -275,8 +280,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
startTime, untilTime, entityType, entityId,
eventNames, targetEntityType, targetEntityId, reversed)
limit.getOrElse(20) match {
- case -1 => ESUtils.getAll[Event](restClient, index, estype, query).toIterator
- case size => ESUtils.get[Event](restClient, index, estype, query, size).toIterator
+ case -1 => ESUtils.getEventAll(restClient, index, estype, query).toIterator
+ case size => ESUtils.getEvents(restClient, index, estype, query, size).toIterator
}
} catch {
case e: IOException =>
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 72f4dd6..4eb117e 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -34,17 +34,72 @@ import org.joda.time.format.DateTimeFormat
import org.joda.time.DateTimeZone
import org.apache.predictionio.data.storage.StorageClientConfig
import org.apache.http.HttpHost
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.DataMap
object ESUtils {
val scrollLife = "1m"
- def get[T: Manifest](
+ def toEvent(value: JValue)(
+ implicit formats: Formats): Event = {
+ def getString(s: String): String = {
+ (value \ s) match {
+ case x if x == JNothing => null
+ case x => x.extract[String]
+ }
+ }
+
+ def getOptString(s: String): Option[String] = {
+ getString(s) match {
+ case null => None
+ case x => Some(x)
+ }
+ }
+
+ val properties: DataMap = getOptString("properties")
+ .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
+ val eventId = getOptString("eventId")
+ val event = getString("event")
+ val entityType = getString("entityType")
+ val entityId = getString("entityId")
+ val targetEntityType = getOptString("targetEntityType")
+ val targetEntityId = getOptString("targetEntityId")
+ val prId = getOptString("prId")
+ val eventTime: DateTime = ESUtils.parseUTCDateTime(getString("eventTime"))
+ val creationTime: DateTime = ESUtils.parseUTCDateTime(getString("creationTime"))
+ val tags = (value \ "tags").extract[Seq[String]]
+
+ Event(
+ eventId = eventId,
+ event = event,
+ entityType = entityType,
+ entityId = entityId,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ properties = properties,
+ eventTime = eventTime,
+ tags = tags,
+ prId = prId,
+ creationTime = creationTime)
+ }
+
+ def getEvents(
client: RestClient,
index: String,
estype: String,
query: String,
size: Int)(
- implicit formats: Formats): Seq[T] = {
+ implicit formats: Formats): Seq[Event] = {
+ getDocList(client, index, estype, query, size).map(x => toEvent(x))
+ }
+
+ def getDocList(
+ client: RestClient,
+ index: String,
+ estype: String,
+ query: String,
+ size: Int)(
+ implicit formats: Formats): Seq[JValue] = {
val response = client.performRequest(
"POST",
s"/$index/$estype/_search",
@@ -52,7 +107,7 @@ object ESUtils {
new StringEntity(query))
val responseJValue = parse(EntityUtils.toString(response.getEntity))
val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]]
- hits.map(h => (h \ "_source").extract[T])
+ hits.map(h => (h \ "_source"))
}
def getAll[T: Manifest](
@@ -61,9 +116,27 @@ object ESUtils {
estype: String,
query: String)(
implicit formats: Formats): Seq[T] = {
+ getDocAll(client, index, estype, query).map(x => x.extract[T])
+ }
+
+ def getEventAll(
+ client: RestClient,
+ index: String,
+ estype: String,
+ query: String)(
+ implicit formats: Formats): Seq[Event] = {
+ getDocAll(client, index, estype, query).map(x => toEvent(x))
+ }
+
+ def getDocAll(
+ client: RestClient,
+ index: String,
+ estype: String,
+ query: String)(
+ implicit formats: Formats): Seq[JValue] = {
@scala.annotation.tailrec
- def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = {
+ def scroll(scrollId: String, hits: Seq[JValue], results: Seq[JValue]): Seq[JValue] = {
if (hits.isEmpty) results
else {
val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
@@ -76,7 +149,7 @@ object ESUtils {
val responseJValue = parse(EntityUtils.toString(response.getEntity))
scroll((responseJValue \ "_scroll_id").extract[String],
(responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
- hits.map(h => (h \ "_source").extract[T]) ++ results)
+ hits.map(h => (h \ "_source").extract[JValue]) ++ results)
}
}
@@ -87,8 +160,8 @@ object ESUtils {
new StringEntity(query))
val responseJValue = parse(EntityUtils.toString(response.getEntity))
scroll((responseJValue \ "_scroll_id").extract[String],
- (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
- Nil)
+ (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+ Nil)
}
def createIndex(
@@ -131,6 +204,16 @@ object ESUtils {
}
}
+ def formatUTCDateTime(dt: DateTime): String = {
+ DateTimeFormat
+ .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(dt.withZone(DateTimeZone.UTC))
+ }
+
+ def parseUTCDateTime(str: String): DateTime = {
+ DateTimeFormat
+ .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").parseDateTime(str)
+ }
+
def createEventQuery(
startTime: Option[DateTime] = None,
untilTime: Option[DateTime] = None,
@@ -142,13 +225,11 @@ object ESUtils {
reversed: Option[Boolean] = None): String = {
val mustQueries = Seq(
startTime.map(x => {
- val v = DateTimeFormat
- .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+ val v = formatUTCDateTime(x)
s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
}),
untilTime.map(x => {
- val v = DateTimeFormat
- .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+ val v = formatUTCDateTime(x)
s"""{"range":{"eventTime":{"lt":"${v}"}}}"""
}),
entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),