You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/05/03 16:07:15 UTC
incubator-predictionio git commit: ES storage improvement/refactoring
Repository: incubator-predictionio
Updated Branches:
refs/heads/develop ada0591f2 -> 945139be0
ES storage improvement/refactoring
- Use the same ID generation for ES in event data
- Reuse RestClient instance in ESLEvents
- Replace 0 with false in mapping
- Set Content-Type as application/json
Closes #374
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/945139be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/945139be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/945139be
Branch: refs/heads/develop
Commit: 945139be0a68e0733a3ab91d4ea4e4bfbde97c6f
Parents: ada0591
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Wed May 3 09:06:05 2017 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Wed May 3 09:06:05 2017 -0700
----------------------------------------------------------------------
.../storage/elasticsearch/ESAccessKeys.scala | 2 +-
.../data/storage/elasticsearch/ESApps.scala | 20 +++--
.../data/storage/elasticsearch/ESChannels.scala | 20 +++--
.../elasticsearch/ESEngineInstances.scala | 2 +-
.../elasticsearch/ESEvaluationInstances.scala | 24 ++---
.../storage/elasticsearch/ESEventsUtil.scala | 95 +++++++++++++++++++-
.../data/storage/elasticsearch/ESLEvents.scala | 69 +++++---------
.../storage/elasticsearch/ESSequences.scala | 10 +--
.../data/storage/elasticsearch/ESUtils.scala | 8 +-
9 files changed, 169 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 9278366..98c2781 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -49,7 +49,7 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> false)) ~
("properties" ->
("key" -> ("type" -> "keyword")) ~
("events" -> ("type" -> "keyword"))))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index e7fe4af..0b319ab 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -50,7 +50,7 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> false)) ~
("properties" ->
("id" -> ("type" -> "keyword")) ~
("name" -> ("type" -> "keyword"))))
@@ -60,12 +60,18 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
}
def insert(app: App): Option[Int] = {
- val id =
- if (app.id == 0) {
- var roll = seq.genNext(estype)
- while (!get(roll).isEmpty) roll = seq.genNext(estype)
- roll
- } else app.id
+ val id = app.id match {
+ case v if v == 0 =>
+ @scala.annotation.tailrec
+ def generateId: Int = {
+ seq.genNext(estype).toInt match {
+ case x if !get(x).isEmpty => generateId
+ case x => x
+ }
+ }
+ generateId
+ case v => v
+ }
update(app.copy(id = id))
Some(id)
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index a173c59..c142beb 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -49,7 +49,7 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> false)) ~
("properties" ->
("name" -> ("type" -> "keyword"))))
ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
@@ -58,12 +58,18 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
}
def insert(channel: Channel): Option[Int] = {
- val id =
- if (channel.id == 0) {
- var roll = seq.genNext(estype)
- while (!get(roll).isEmpty) roll = seq.genNext(estype)
- roll
- } else channel.id
+ val id = channel.id match {
+ case v if v == 0 =>
+ @scala.annotation.tailrec
+ def generateId: Int = {
+ seq.genNext(estype).toInt match {
+ case x if !get(x).isEmpty => generateId
+ case x => x
+ }
+ }
+ generateId
+ case v => v
+ }
if (update(channel.copy(id = id))) Some(id) else None
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index e123744..de474c1 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -49,7 +49,7 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 48f191a..9b19cf4 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -51,7 +51,7 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
@@ -59,9 +59,9 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
("evaluationClass" -> ("type" -> "keyword")) ~
("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
("batch" -> ("type" -> "keyword")) ~
- ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
- ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
- ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+ ("evaluatorResults" -> ("type" -> "text")) ~
+ ("evaluatorResultsHTML" -> ("enabled" -> false)) ~
+ ("evaluatorResultsJSON" -> ("enabled" -> false))))
ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
} finally {
restClient.close()
@@ -69,13 +69,17 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
def insert(i: EvaluationInstance): String = {
val id = i.id match {
- case x if x.isEmpty =>
- var roll = seq.genNext(estype).toString
- while (!get(roll).isEmpty) roll = seq.genNext(estype).toString
- roll
- case x => x
+ case v if v.isEmpty =>
+ @scala.annotation.tailrec
+ def generateId: String = {
+ seq.genNext(estype).toString match {
+ case x if !get(x).isEmpty => generateId
+ case x => x
+ }
+ }
+ generateId
+ case v => v
}
-
update(i.copy(id = id))
id
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
index 2edbc35..ec72a49 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -18,6 +18,13 @@
package org.apache.predictionio.data.storage.elasticsearch
+import java.net.NetworkInterface
+import java.net.SocketException
+import java.security.SecureRandom
+import java.util.Base64
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicLong
+
import org.apache.hadoop.io.MapWritable
import org.apache.hadoop.io.Text
import org.apache.predictionio.data.storage.DataMap
@@ -27,6 +34,7 @@ import org.json4s._
import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
+
object ESEventsUtil {
implicit val formats = DefaultFormats
@@ -80,7 +88,7 @@ object ESEventsUtil {
def eventToPut(event: Event, appId: Int): Map[String, Any] = {
Map(
- "eventId" -> event.eventId,
+ "eventId" -> event.eventId.getOrElse { getBase64UUID },
"event" -> event.event,
"entityType" -> event.entityType,
"entityId" -> event.entityId,
@@ -94,4 +102,89 @@ object ESEventsUtil {
)
}
+ val secureRandom: SecureRandom = new SecureRandom()
+
+ val sequenceNumber: AtomicInteger = new AtomicInteger(secureRandom.nextInt())
+
+ val lastTimestamp: AtomicLong = new AtomicLong(0)
+
+ val secureMungedAddress: Array[Byte] = {
+ val address = getMacAddress match {
+ case Some(x) => x
+ case None =>
+ val dummy: Array[Byte] = new Array[Byte](6)
+ secureRandom.nextBytes(dummy)
+ dummy(0) = (dummy(0) | 0x01.toByte).toByte
+ dummy
+ }
+
+ val mungedBytes: Array[Byte] = new Array[Byte](6)
+ secureRandom.nextBytes(mungedBytes)
+ for (i <- 0 until 6) {
+ mungedBytes(i) = (mungedBytes(i) ^ address(i)).toByte
+ }
+
+ mungedBytes
+ }
+
+ def getMacAddress(): Option[Array[Byte]] = {
+ try {
+ NetworkInterface.getNetworkInterfaces match {
+ case en if en == null => None
+ case en =>
+ new Iterator[NetworkInterface] {
+ def next = en.nextElement
+ def hasNext = en.hasMoreElements
+ }.foldLeft(None: Option[Array[Byte]])((x, y) =>
+ x match {
+ case None =>
+ y.isLoopback match {
+ case true =>
+ y.getHardwareAddress match {
+ case address if isValidAddress(address) => Some(address)
+ case _ => None
+ }
+ case false => None
+ }
+ case _ => x
+ })
+ }
+ } catch {
+ case e: SocketException => None
+ }
+ }
+
+ def isValidAddress(address: Array[Byte]): Boolean = {
+ address match {
+ case v if v == null || v.length != 6 => false
+ case v => v.exists(b => b != 0x00.toByte)
+ }
+ }
+
+ def putLong(array: Array[Byte], l: Long, pos: Int, numberOfLongBytes: Int): Unit = {
+ for (i <- 0 until numberOfLongBytes) {
+ array(pos + numberOfLongBytes - i - 1) = (l >>> (i * 8)).toByte
+ }
+ }
+
+ def getBase64UUID(): String = {
+ val sequenceId: Int = sequenceNumber.incrementAndGet & 0xffffff
+ val timestamp: Long = synchronized {
+ val t = Math.max(lastTimestamp.get, System.currentTimeMillis)
+ if (sequenceId == 0) {
+ lastTimestamp.set(t + 1)
+ } else {
+ lastTimestamp.set(t)
+ }
+ lastTimestamp.get
+ }
+
+ val uuidBytes: Array[Byte] = new Array[Byte](15)
+
+ putLong(uuidBytes, timestamp, 0, 6)
+ System.arraycopy(secureMungedAddress, 0, uuidBytes, 6, secureMungedAddress.length)
+ putLong(uuidBytes, sequenceId, 12, 3)
+
+ Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 18e2fed..6cf7a9a 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -45,8 +45,7 @@ import org.apache.http.entity.StringEntity
class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
extends LEvents with Logging {
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
- private val seq = new ESSequences(client, config, index)
- private val seqName = "events"
+ val restClient = client.open()
def getEsType(appId: Int, channelId: Option[Int] = None): String = {
channelId.map { ch =>
@@ -58,37 +57,31 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
- val restClient = client.open()
- try {
- ESUtils.createIndex(restClient, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
- val json =
- (estype ->
- ("_all" -> ("enabled" -> 0)) ~
- ("properties" ->
- ("name" -> ("type" -> "keyword")) ~
- ("eventId" -> ("type" -> "keyword")) ~
- ("event" -> ("type" -> "keyword")) ~
- ("entityType" -> ("type" -> "keyword")) ~
- ("entityId" -> ("type" -> "keyword")) ~
- ("targetEntityType" -> ("type" -> "keyword")) ~
- ("targetEntityId" -> ("type" -> "keyword")) ~
- ("properties" -> ("enabled" -> 0)) ~
- ("eventTime" -> ("type" -> "date")) ~
- ("tags" -> ("type" -> "keyword")) ~
- ("prId" -> ("type" -> "keyword")) ~
- ("creationTime" -> ("type" -> "date"))))
- ESUtils.createMapping(restClient, index, estype, compact(render(json)))
- } finally {
- restClient.close()
- }
+ ESUtils.createIndex(restClient, index,
+ ESUtils.getNumberOfShards(config, index.toUpperCase),
+ ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ val json =
+ (estype ->
+ ("_all" -> ("enabled" -> false)) ~
+ ("properties" ->
+ ("name" -> ("type" -> "keyword")) ~
+ ("eventId" -> ("type" -> "keyword")) ~
+ ("event" -> ("type" -> "keyword")) ~
+ ("entityType" -> ("type" -> "keyword")) ~
+ ("entityId" -> ("type" -> "keyword")) ~
+ ("targetEntityType" -> ("type" -> "keyword")) ~
+ ("targetEntityId" -> ("type" -> "keyword")) ~
+ ("properties" -> ("enabled" -> false)) ~
+ ("eventTime" -> ("type" -> "date")) ~
+ ("tags" -> ("type" -> "keyword")) ~
+ ("prId" -> ("type" -> "keyword")) ~
+ ("creationTime" -> ("type" -> "date"))))
+ ESUtils.createMapping(restClient, index, estype, compact(render(json)))
true
}
override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
- val restClient = client.open()
try {
val json =
("query" ->
@@ -108,13 +101,11 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
case e: Exception =>
error(s"Failed to remove $index/$estype", e)
false
- } finally {
- restClient.close()
}
}
override def close(): Unit = {
- // nothing
+ restClient.close()
}
override def futureInsert(
@@ -123,12 +114,9 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
Future {
val estype = getEsType(appId, channelId)
- val restClient = client.open()
try {
val id = event.eventId.getOrElse {
- var roll = seq.genNext(seqName)
- while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
- roll.toString
+ ESEventsUtil.getBase64UUID
}
val json =
("eventId" -> id) ~
@@ -161,8 +149,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
case e: IOException =>
error(s"Failed to update $index/$estype/<id>", e)
""
- } finally {
- restClient.close()
}
}
}
@@ -196,7 +182,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
Future {
val estype = getEsType(appId, channelId)
- val restClient = client.open()
try {
val json =
("query" ->
@@ -220,8 +205,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
None
- } finally {
- restClient.close()
}
}
}
@@ -232,7 +215,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
Future {
val estype = getEsType(appId, channelId)
- val restClient = client.open()
try {
val json =
("query" ->
@@ -255,8 +237,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
case e: IOException =>
error(s"Failed to update $index/$estype:$eventId", e)
false
- } finally {
- restClient.close()
}
}
}
@@ -276,7 +256,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
Future {
val estype = getEsType(appId, channelId)
- val restClient = client.open()
try {
val query = ESUtils.createEventQuery(
startTime, untilTime, entityType, entityId,
@@ -289,8 +268,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
case e: IOException =>
error(e.getMessage)
Iterator[Event]()
- } finally {
- restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index 8a6a047..9fd31a3 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -46,15 +46,15 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String)
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> false)) ~
("properties" ->
- ("n" -> ("enabled" -> 0))))
+ ("n" -> ("enabled" -> false))))
ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
} finally {
restClient.close()
}
- def genNext(name: String): Int = {
+ def genNext(name: String): Long = {
val restClient = client.open()
try {
val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
@@ -67,9 +67,9 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String)
val result = (jsonResponse \ "result").extract[String]
result match {
case "created" =>
- (jsonResponse \ "_version").extract[Int]
+ (jsonResponse \ "_version").extract[Long]
case "updated" =>
- (jsonResponse \ "_version").extract[Int]
+ (jsonResponse \ "_version").extract[Long]
case _ =>
throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name")
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 34c76eb..2bf18ef 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -100,11 +100,12 @@ object ESUtils {
query: String,
size: Int)(
implicit formats: Formats): Seq[JValue] = {
+ val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
s"/$index/$estype/_search",
Map("size" -> s"${size}"),
- new StringEntity(query))
+ entity)
val responseJValue = parse(EntityUtils.toString(response.getEntity))
val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]]
hits.map(h => (h \ "_source"))
@@ -140,7 +141,7 @@ object ESUtils {
if (hits.isEmpty) results
else {
val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
- val scrollBody = new StringEntity(compact(render(json)))
+ val scrollBody = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
"/_search/scroll",
@@ -153,11 +154,12 @@ object ESUtils {
}
}
+ val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
s"/$index/$estype/_search",
Map("scroll" -> scrollLife),
- new StringEntity(query))
+ entity)
val responseJValue = parse(EntityUtils.toString(response.getEntity))
scroll((responseJValue \ "_scroll_id").extract[String],
(responseJValue \ "hits" \ "hits").extract[Seq[JValue]],