You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/05/03 16:07:15 UTC

incubator-predictionio git commit: ES storage improvement/refactoring

Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop ada0591f2 -> 945139be0


ES storage improvement/refactoring

- Use the same ID generation for ES in event data
- Reuse RestClient instance in ESLEvents
- Replace 0 with false in mapping
- Set Content-Type as application/json

Closes #374


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/945139be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/945139be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/945139be

Branch: refs/heads/develop
Commit: 945139be0a68e0733a3ab91d4ea4e4bfbde97c6f
Parents: ada0591
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Wed May 3 09:06:05 2017 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Wed May 3 09:06:05 2017 -0700

----------------------------------------------------------------------
 .../storage/elasticsearch/ESAccessKeys.scala    |  2 +-
 .../data/storage/elasticsearch/ESApps.scala     | 20 +++--
 .../data/storage/elasticsearch/ESChannels.scala | 20 +++--
 .../elasticsearch/ESEngineInstances.scala       |  2 +-
 .../elasticsearch/ESEvaluationInstances.scala   | 24 ++---
 .../storage/elasticsearch/ESEventsUtil.scala    | 95 +++++++++++++++++++-
 .../data/storage/elasticsearch/ESLEvents.scala  | 69 +++++---------
 .../storage/elasticsearch/ESSequences.scala     | 10 +--
 .../data/storage/elasticsearch/ESUtils.scala    |  8 +-
 9 files changed, 169 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 9278366..98c2781 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -49,7 +49,7 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
           ("key" -> ("type" -> "keyword")) ~
           ("events" -> ("type" -> "keyword"))))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index e7fe4af..0b319ab 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -50,7 +50,7 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
           ("id" -> ("type" -> "keyword")) ~
           ("name" -> ("type" -> "keyword"))))
@@ -60,12 +60,18 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
   }
 
   def insert(app: App): Option[Int] = {
-    val id =
-      if (app.id == 0) {
-        var roll = seq.genNext(estype)
-        while (!get(roll).isEmpty) roll = seq.genNext(estype)
-        roll
-      } else app.id
+    val id = app.id match {
+      case v if v == 0 =>
+        @scala.annotation.tailrec
+        def generateId: Int = {
+          seq.genNext(estype).toInt match {
+            case x if !get(x).isEmpty => generateId
+            case x => x
+          }
+        }
+        generateId
+      case v => v
+    }
     update(app.copy(id = id))
     Some(id)
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index a173c59..c142beb 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -49,7 +49,7 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
           ("name" -> ("type" -> "keyword"))))
     ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
@@ -58,12 +58,18 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
   }
 
   def insert(channel: Channel): Option[Int] = {
-    val id =
-      if (channel.id == 0) {
-        var roll = seq.genNext(estype)
-        while (!get(roll).isEmpty) roll = seq.genNext(estype)
-        roll
-      } else channel.id
+    val id = channel.id match {
+      case v if v == 0 =>
+        @scala.annotation.tailrec
+        def generateId: Int = {
+          seq.genNext(estype).toInt match {
+            case x if !get(x).isEmpty => generateId
+            case x => x
+          }
+        }
+        generateId
+      case v => v
+    }
 
     if (update(channel.copy(id = id))) Some(id) else None
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index e123744..de474c1 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -49,7 +49,7 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
           ("status" -> ("type" -> "keyword")) ~
           ("startTime" -> ("type" -> "date")) ~

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 48f191a..9b19cf4 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -51,7 +51,7 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
           ("status" -> ("type" -> "keyword")) ~
           ("startTime" -> ("type" -> "date")) ~
@@ -59,9 +59,9 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
           ("evaluationClass" -> ("type" -> "keyword")) ~
           ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
           ("batch" -> ("type" -> "keyword")) ~
-          ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
-          ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
-          ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+          ("evaluatorResults" -> ("type" -> "text")) ~
+          ("evaluatorResultsHTML" -> ("enabled" -> false)) ~
+          ("evaluatorResultsJSON" -> ("enabled" -> false))))
     ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
   } finally {
     restClient.close()
@@ -69,13 +69,17 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
 
   def insert(i: EvaluationInstance): String = {
     val id = i.id match {
-      case x if x.isEmpty =>
-        var roll = seq.genNext(estype).toString
-        while (!get(roll).isEmpty) roll = seq.genNext(estype).toString
-        roll
-      case x => x
+      case v if v.isEmpty =>
+        @scala.annotation.tailrec
+        def generateId: String = {
+          seq.genNext(estype).toString match {
+            case x if !get(x).isEmpty => generateId
+            case x => x
+          }
+        }
+        generateId
+      case v => v
     }
-
     update(i.copy(id = id))
     id
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
index 2edbc35..ec72a49 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -18,6 +18,13 @@
 
 package org.apache.predictionio.data.storage.elasticsearch
 
+import java.net.NetworkInterface
+import java.net.SocketException
+import java.security.SecureRandom
+import java.util.Base64
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicLong
+
 import org.apache.hadoop.io.MapWritable
 import org.apache.hadoop.io.Text
 import org.apache.predictionio.data.storage.DataMap
@@ -27,6 +34,7 @@ import org.json4s._
 import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
+
 object ESEventsUtil {
 
   implicit val formats = DefaultFormats
@@ -80,7 +88,7 @@ object ESEventsUtil {
 
   def eventToPut(event: Event, appId: Int): Map[String, Any] = {
     Map(
-      "eventId" -> event.eventId,
+      "eventId" -> event.eventId.getOrElse { getBase64UUID },
       "event" -> event.event,
       "entityType" -> event.entityType,
       "entityId" -> event.entityId,
@@ -94,4 +102,89 @@ object ESEventsUtil {
     )
   }
 
+  val secureRandom: SecureRandom = new SecureRandom()
+
+  val sequenceNumber: AtomicInteger = new AtomicInteger(secureRandom.nextInt())
+
+  val lastTimestamp: AtomicLong = new AtomicLong(0)
+
+  val secureMungedAddress: Array[Byte] = {
+    val address = getMacAddress match {
+      case Some(x) => x
+      case None =>
+        val dummy: Array[Byte] = new Array[Byte](6)
+        secureRandom.nextBytes(dummy)
+        dummy(0) = (dummy(0) | 0x01.toByte).toByte
+        dummy
+    }
+
+    val mungedBytes: Array[Byte] = new Array[Byte](6)
+    secureRandom.nextBytes(mungedBytes)
+    for (i <- 0 until 6) {
+      mungedBytes(i) = (mungedBytes(i) ^ address(i)).toByte
+    }
+
+    mungedBytes
+  }
+
+  def getMacAddress(): Option[Array[Byte]] = {
+    try {
+      NetworkInterface.getNetworkInterfaces match {
+        case en if en == null => None
+        case en =>
+          new Iterator[NetworkInterface] {
+            def next = en.nextElement
+            def hasNext = en.hasMoreElements
+          }.foldLeft(None: Option[Array[Byte]])((x, y) =>
+            x match {
+              case None =>
+                y.isLoopback match {
+                  case true =>
+                    y.getHardwareAddress match {
+                      case address if isValidAddress(address) => Some(address)
+                      case _ => None
+                    }
+                  case false => None
+                }
+              case _ => x
+            })
+      }
+    } catch {
+      case e: SocketException => None
+    }
+  }
+
+  def isValidAddress(address: Array[Byte]): Boolean = {
+    address match {
+      case v if v == null || v.length != 6 => false
+      case v => v.exists(b => b != 0x00.toByte)
+    }
+  }
+
+  def putLong(array: Array[Byte], l: Long, pos: Int, numberOfLongBytes: Int): Unit = {
+    for (i <- 0 until numberOfLongBytes) {
+      array(pos + numberOfLongBytes - i - 1) = (l >>> (i * 8)).toByte
+    }
+  }
+
+  def getBase64UUID(): String = {
+    val sequenceId: Int = sequenceNumber.incrementAndGet & 0xffffff
+    val timestamp: Long = synchronized {
+      val t = Math.max(lastTimestamp.get, System.currentTimeMillis)
+      if (sequenceId == 0) {
+        lastTimestamp.set(t + 1)
+      } else {
+        lastTimestamp.set(t)
+      }
+      lastTimestamp.get
+    }
+
+    val uuidBytes: Array[Byte] = new Array[Byte](15)
+
+    putLong(uuidBytes, timestamp, 0, 6)
+    System.arraycopy(secureMungedAddress, 0, uuidBytes, 6, secureMungedAddress.length)
+    putLong(uuidBytes, sequenceId, 12, 3)
+
+    Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 18e2fed..6cf7a9a 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -45,8 +45,7 @@ import org.apache.http.entity.StringEntity
 class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
     extends LEvents with Logging {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
-  private val seq = new ESSequences(client, config, index)
-  private val seqName = "events"
+  val restClient = client.open()
 
   def getEsType(appId: Int, channelId: Option[Int] = None): String = {
     channelId.map { ch =>
@@ -58,37 +57,31 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
 
   override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
-    val restClient = client.open()
-    try {
-      ESUtils.createIndex(restClient, index,
-        ESUtils.getNumberOfShards(config, index.toUpperCase),
-        ESUtils.getNumberOfReplicas(config, index.toUpperCase))
-      val json =
-        (estype ->
-          ("_all" -> ("enabled" -> 0)) ~
-          ("properties" ->
-            ("name" -> ("type" -> "keyword")) ~
-            ("eventId" -> ("type" -> "keyword")) ~
-            ("event" -> ("type" -> "keyword")) ~
-            ("entityType" -> ("type" -> "keyword")) ~
-            ("entityId" -> ("type" -> "keyword")) ~
-            ("targetEntityType" -> ("type" -> "keyword")) ~
-            ("targetEntityId" -> ("type" -> "keyword")) ~
-            ("properties" -> ("enabled" -> 0)) ~
-            ("eventTime" -> ("type" -> "date")) ~
-            ("tags" -> ("type" -> "keyword")) ~
-            ("prId" -> ("type" -> "keyword")) ~
-            ("creationTime" -> ("type" -> "date"))))
-      ESUtils.createMapping(restClient, index, estype, compact(render(json)))
-    } finally {
-      restClient.close()
-    }
+    ESUtils.createIndex(restClient, index,
+      ESUtils.getNumberOfShards(config, index.toUpperCase),
+      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+    val json =
+      (estype ->
+        ("_all" -> ("enabled" -> false)) ~
+        ("properties" ->
+          ("name" -> ("type" -> "keyword")) ~
+          ("eventId" -> ("type" -> "keyword")) ~
+          ("event" -> ("type" -> "keyword")) ~
+          ("entityType" -> ("type" -> "keyword")) ~
+          ("entityId" -> ("type" -> "keyword")) ~
+          ("targetEntityType" -> ("type" -> "keyword")) ~
+          ("targetEntityId" -> ("type" -> "keyword")) ~
+          ("properties" -> ("enabled" -> false)) ~
+          ("eventTime" -> ("type" -> "date")) ~
+          ("tags" -> ("type" -> "keyword")) ~
+          ("prId" -> ("type" -> "keyword")) ~
+          ("creationTime" -> ("type" -> "date"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(json)))
     true
   }
 
   override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
@@ -108,13 +101,11 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
       case e: Exception =>
         error(s"Failed to remove $index/$estype", e)
         false
-    } finally {
-      restClient.close()
     }
   }
 
   override def close(): Unit = {
-    // nothing
+    restClient.close()
   }
 
   override def futureInsert(
@@ -123,12 +114,9 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
     Future {
       val estype = getEsType(appId, channelId)
-      val restClient = client.open()
       try {
         val id = event.eventId.getOrElse {
-          var roll = seq.genNext(seqName)
-          while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
-          roll.toString
+          ESEventsUtil.getBase64UUID
         }
         val json =
           ("eventId" -> id) ~
@@ -161,8 +149,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
         case e: IOException =>
           error(s"Failed to update $index/$estype/<id>", e)
           ""
-      } finally {
-        restClient.close()
       }
     }
   }
@@ -196,7 +182,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
-      val restClient = client.open()
       try {
         val json =
           ("query" ->
@@ -220,8 +205,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
         case e: IOException =>
           error("Failed to access to /$index/$estype/_search", e)
           None
-      } finally {
-        restClient.close()
       }
     }
   }
@@ -232,7 +215,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
     Future {
       val estype = getEsType(appId, channelId)
-      val restClient = client.open()
       try {
         val json =
           ("query" ->
@@ -255,8 +237,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
         case e: IOException =>
           error(s"Failed to update $index/$estype:$eventId", e)
           false
-      } finally {
-        restClient.close()
       }
     }
   }
@@ -276,7 +256,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
-      val restClient = client.open()
       try {
         val query = ESUtils.createEventQuery(
           startTime, untilTime, entityType, entityId,
@@ -289,8 +268,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
         case e: IOException =>
           error(e.getMessage)
           Iterator[Event]()
-      } finally {
-        restClient.close()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index 8a6a047..9fd31a3 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -46,15 +46,15 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String)
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
-          ("n" -> ("enabled" -> 0))))
+          ("n" -> ("enabled" -> false))))
     ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
   } finally {
     restClient.close()
   }
 
-  def genNext(name: String): Int = {
+  def genNext(name: String): Long = {
     val restClient = client.open()
     try {
       val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
@@ -67,9 +67,9 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String)
       val result = (jsonResponse \ "result").extract[String]
       result match {
         case "created" =>
-          (jsonResponse \ "_version").extract[Int]
+          (jsonResponse \ "_version").extract[Long]
         case "updated" =>
-          (jsonResponse \ "_version").extract[Int]
+          (jsonResponse \ "_version").extract[Long]
         case _ =>
           throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name")
       }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 34c76eb..2bf18ef 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -100,11 +100,12 @@ object ESUtils {
     query: String,
     size: Int)(
       implicit formats: Formats): Seq[JValue] = {
+    val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
     val response = client.performRequest(
       "POST",
       s"/$index/$estype/_search",
       Map("size" -> s"${size}"),
-      new StringEntity(query))
+      entity)
     val responseJValue = parse(EntityUtils.toString(response.getEntity))
     val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]]
     hits.map(h => (h \ "_source"))
@@ -140,7 +141,7 @@ object ESUtils {
       if (hits.isEmpty) results
       else {
         val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
-        val scrollBody = new StringEntity(compact(render(json)))
+        val scrollBody = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
         val response = client.performRequest(
           "POST",
           "/_search/scroll",
@@ -153,11 +154,12 @@ object ESUtils {
       }
     }
 
+    val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
     val response = client.performRequest(
       "POST",
       s"/$index/$estype/_search",
       Map("scroll" -> scrollLife),
-      new StringEntity(query))
+      entity)
     val responseJValue = parse(EntityUtils.toString(response.getEntity))
     scroll((responseJValue \ "_scroll_id").extract[String],
       (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],