You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/02/11 22:00:10 UTC

[03/10] incubator-predictionio git commit: add ESClient to close RestClient

add ESClient to close RestClient


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/d4e75ab5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/d4e75ab5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/d4e75ab5

Branch: refs/heads/feature/es5
Commit: d4e75ab5441c2d7278c0cc55c6f1d3b51c9479a0
Parents: 36b79d7
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Mon Jan 16 17:28:54 2017 +0900
Committer: Shinsuke Sugaya <sh...@yahoo.co.jp>
Committed: Mon Jan 16 17:28:54 2017 +0900

----------------------------------------------------------------------
 .../predictionio/workflow/CreateWorkflow.scala  | 74 +++++++--------
 .../storage/elasticsearch/ESAccessKeys.scala    | 48 +++++++---
 .../data/storage/elasticsearch/ESApps.scala     | 48 +++++++---
 .../data/storage/elasticsearch/ESChannels.scala | 42 ++++++---
 .../elasticsearch/ESEngineInstances.scala       | 75 ++++++++++-----
 .../elasticsearch/ESEngineManifests.scala       | 23 +++--
 .../elasticsearch/ESEvaluationInstances.scala   | 62 +++++++-----
 .../data/storage/elasticsearch/ESLEvents.scala  | 99 ++++++++++++--------
 .../data/storage/elasticsearch/ESPEvents.scala  | 12 +--
 .../storage/elasticsearch/ESSequences.scala     | 22 +++--
 .../data/storage/elasticsearch/ESUtils.scala    |  6 +-
 .../storage/elasticsearch/StorageClient.scala   | 24 +++--
 12 files changed, 330 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
index edfc1b6..899ace2 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
@@ -223,40 +223,36 @@ object CreateWorkflow extends Logging {
         engineFactoryObj.engineParams(wfc.engineParamsKey)
       }
 
-      try {
-        val engineInstance = EngineInstance(
-          id = "",
-          status = "INIT",
-          startTime = DateTime.now,
-          endTime = DateTime.now,
-          engineId = wfc.engineId,
-          engineVersion = wfc.engineVersion,
-          engineVariant = variantId,
-          engineFactory = engineFactory,
-          batch = wfc.batch,
-          env = pioEnvVars,
-          sparkConf = workflowParams.sparkEnv,
-          dataSourceParams =
-            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
-          preparatorParams =
-            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
-          algorithmsParams =
-            JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
-          servingParams =
-            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
+      val engineInstance = EngineInstance(
+        id = "",
+        status = "INIT",
+        startTime = DateTime.now,
+        endTime = DateTime.now,
+        engineId = wfc.engineId,
+        engineVersion = wfc.engineVersion,
+        engineVariant = variantId,
+        engineFactory = engineFactory,
+        batch = wfc.batch,
+        env = pioEnvVars,
+        sparkConf = workflowParams.sparkEnv,
+        dataSourceParams =
+          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
+        preparatorParams =
+          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
+        algorithmsParams =
+          JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
+        servingParams =
+          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
 
-        val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
-          engineInstance)
+      val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
+        engineInstance)
 
-        CoreWorkflow.runTrain(
-          env = pioEnvVars,
-          params = workflowParams,
-          engine = trainableEngine,
-          engineParams = engineParams,
-          engineInstance = engineInstance.copy(id = engineInstanceId))
-      } finally {
-        Storage.getLEvents().close()
-      }
+      CoreWorkflow.runTrain(
+        env = pioEnvVars,
+        params = workflowParams,
+        engine = trainableEngine,
+        engineParams = engineParams,
+        engineInstance = engineInstance.copy(id = engineInstanceId))
     } else {
       val workflowParams = WorkflowParams(
         verbose = wfc.verbosity,
@@ -271,15 +267,11 @@ object CreateWorkflow extends Logging {
         env = pioEnvVars,
         sparkConf = workflowParams.sparkEnv
       )
-      try {
-        Workflow.runEvaluation(
-          evaluation = evaluation.get,
-          engineParamsGenerator = engineParamsGenerator.get,
-          evaluationInstance = evaluationInstance,
-          params = workflowParams)
-      } finally {
-        Storage.getLEvents().close()
-      }
+      Workflow.runEvaluation(
+        evaluation = evaluation.get,
+        engineParamsGenerator = engineParamsGenerator.get,
+        evaluationInstance = evaluationInstance,
+        params = workflowParams)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 2c69cf4..9156fab 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -37,19 +37,24 @@ import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
 
 /** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String)
+class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
     extends AccessKeys with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "accesskeys"
 
-  ESUtils.createIndex(client, index)
-  val mappingJson =
-    (estype ->
-      ("_all" -> ("enabled" -> 0)) ~
-      ("properties" ->
-        ("key" -> ("type" -> "keyword")) ~
-        ("events" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("key" -> ("type" -> "keyword")) ~
+          ("events" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
 
   def insert(accessKey: AccessKey): Option[String] = {
     val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
@@ -58,8 +63,9 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
   }
 
   def get(id: String): Option[AccessKey] = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -81,41 +87,50 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
       case e: IOException =>
         error("Failed to access to /$index/$estype/$key", e)
         None
+    } finally {
+      restClient.close()
     }
   }
 
   def getAll(): Seq[AccessKey] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
+      ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
   def getByAppid(appid: Int): Seq[AccessKey] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("term" ->
             ("appid" -> appid)))
-      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
+      ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
   def update(accessKey: AccessKey): Unit = {
     val id = accessKey.key
+    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava,
@@ -131,12 +146,15 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
 
   def delete(id: String): Unit = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -150,6 +168,8 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/id", e)
+    } finally {
+      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 7a65379..0379c90 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -37,20 +37,25 @@ import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
 
 /** Elasticsearch implementation of Items. */
-class ESApps(client: RestClient, config: StorageClientConfig, index: String)
+class ESApps(client: ESClient, config: StorageClientConfig, index: String)
     extends Apps with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "apps"
   private val seq = new ESSequences(client, config, index)
 
-  ESUtils.createIndex(client, index)
-  val mappingJson =
-    (estype ->
-      ("_all" -> ("enabled" -> 0))~
-      ("properties" ->
-        ("id" -> ("type" -> "keyword")) ~
-        ("name" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("id" -> ("type" -> "keyword")) ~
+          ("name" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
 
   def insert(app: App): Option[Int] = {
     val id =
@@ -64,8 +69,9 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
   }
 
   def get(id: Int): Option[App] = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -87,17 +93,20 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
+    } finally {
+      restClient.close()
     }
   }
 
   def getByName(name: String): Option[App] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("term" ->
             ("name" -> name)))
       val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/_search",
         Map.empty[String, String].asJava,
@@ -114,27 +123,33 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/_search", e)
         None
+    } finally {
+      restClient.close()
     }
   }
 
   def getAll(): Seq[App] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[App](client, index, estype, compact(render(json)))
+      ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
   def update(app: App): Unit = {
     val id = app.id.toString
+    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava,
@@ -150,12 +165,15 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
 
   def delete(id: Int): Unit = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -169,6 +187,8 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/id", e)
+    } finally {
+      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index c90d668..b319c26 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -36,19 +36,24 @@ import org.json4s.native.Serialization.write
 import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
 
-class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
+class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
     extends Channels with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "channels"
   private val seq = new ESSequences(client, config, index)
 
-  ESUtils.createIndex(client, index)
-  val mappingJson =
-    (estype ->
-      ("_all" -> ("enabled" -> 0))~
-      ("properties" ->
-        ("name" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("name" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
 
   def insert(channel: Channel): Option[Int] = {
     val id =
@@ -62,8 +67,9 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
   }
 
   def get(id: Int): Option[Channel] = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -85,28 +91,34 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
+    } finally {
+      restClient.close()
     }
   }
 
   def getByAppid(appid: Int): Seq[Channel] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("term" ->
             ("appid" -> appid)))
-      ESUtils.getAll[Channel](client, index, estype, compact(render(json)))
+      ESUtils.getAll[Channel](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
   def update(channel: Channel): Boolean = {
     val id = channel.id.toString
+    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava,
@@ -124,12 +136,15 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
         false
+    } finally {
+      restClient.close()
     }
   }
 
   def delete(id: Int): Unit = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -143,7 +158,8 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 08f87f3..68cdeac 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -37,30 +37,35 @@ import org.json4s.native.Serialization.write
 import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
 
-class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String)
+class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String)
     extends EngineInstances with Logging {
   implicit val formats = DefaultFormats + new EngineInstanceSerializer
   private val estype = "engine_instances"
 
-  ESUtils.createIndex(client, index)
-  val mappingJson =
-    (estype ->
-      ("_all" -> ("enabled" -> 0))~
-      ("properties" ->
-        ("status" -> ("type" -> "keyword")) ~
-        ("startTime" -> ("type" -> "date")) ~
-        ("endTime" -> ("type" -> "date")) ~
-        ("engineId" -> ("type" -> "keyword")) ~
-        ("engineVersion" -> ("type" -> "keyword")) ~
-        ("engineVariant" -> ("type" -> "keyword")) ~
-        ("engineFactory" -> ("type" -> "keyword")) ~
-        ("batch" -> ("type" -> "keyword")) ~
-        ("dataSourceParams" -> ("type" -> "keyword")) ~
-        ("preparatorParams" -> ("type" -> "keyword")) ~
-        ("algorithmsParams" -> ("type" -> "keyword")) ~
-        ("servingParams" -> ("type" -> "keyword")) ~
-        ("status" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("status" -> ("type" -> "keyword")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("engineId" -> ("type" -> "keyword")) ~
+          ("engineVersion" -> ("type" -> "keyword")) ~
+          ("engineVariant" -> ("type" -> "keyword")) ~
+          ("engineFactory" -> ("type" -> "keyword")) ~
+          ("batch" -> ("type" -> "keyword")) ~
+          ("dataSourceParams" -> ("type" -> "keyword")) ~
+          ("preparatorParams" -> ("type" -> "keyword")) ~
+          ("algorithmsParams" -> ("type" -> "keyword")) ~
+          ("servingParams" -> ("type" -> "keyword")) ~
+          ("status" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
 
   def insert(i: EngineInstance): String = {
     val id = i.id match {
@@ -81,9 +86,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
   }
 
   def preInsert(): Option[String] = {
+    val restClient = client.open()
     try {
       val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/",
         Map.empty[String, String].asJava,
@@ -101,12 +107,15 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
       case e: IOException =>
         error(s"Failed to create $index/$estype", e)
         None
+    } finally {
+      restClient.close()
     }
   }
 
   def get(id: String): Option[EngineInstance] = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -128,19 +137,24 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
+    } finally {
+      restClient.close()
     }
   }
 
   def getAll(): Seq[EngineInstance] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
@@ -148,6 +162,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
     engineId: String,
     engineVersion: String,
     engineVariant: String): Seq[EngineInstance] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
@@ -164,11 +179,13 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
               ("sort" -> List(
                 ("startTime" ->
                   ("order" -> "desc"))))
-      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
@@ -183,9 +200,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
 
   def update(i: EngineInstance): Unit = {
     val id = i.id
+    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava,
@@ -201,12 +219,15 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
 
   def delete(id: String): Unit = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -220,6 +241,8 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
index a965c71..ae4d86b 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
@@ -37,7 +37,7 @@ import org.json4s.native.Serialization.write
 import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
 
-class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: String)
+class ESEngineManifests(client: ESClient, config: StorageClientConfig, index: String)
     extends EngineManifests with Logging {
   implicit val formats = DefaultFormats + new EngineManifestSerializer
   private val estype = "engine_manifests"
@@ -45,9 +45,10 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
 
   def insert(engineManifest: EngineManifest): Unit = {
     val id = esid(engineManifest.id, engineManifest.version)
+    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(engineManifest), ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava,
@@ -63,13 +64,16 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
 
   def get(id: String, version: String): Option[EngineManifest] = {
     val esId = esid(id, version)
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "GET",
         s"/$index/$estype/$esId",
         Map.empty[String, String].asJava)
@@ -91,20 +95,24 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$esId", e)
         None
+    } finally {
+      restClient.close()
     }
-
   }
 
   def getAll(): Seq[EngineManifest] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" ->  List.empty))
-      ESUtils.getAll[EngineManifest](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineManifest](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
@@ -113,8 +121,9 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
 
   def delete(id: String, version: String): Unit = {
     val esId = esid(id, version)
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "DELETE",
         s"/$index/$estype/$esId",
         Map.empty[String, String].asJava)
@@ -128,6 +137,8 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$esId", e)
+    } finally {
+      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 0e71f79..1f798f0 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -38,27 +38,32 @@ import org.json4s.native.Serialization.write
 import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
 
-class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String)
+class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String)
     extends EvaluationInstances with Logging {
   implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
   private val estype = "evaluation_instances"
   private val seq = new ESSequences(client, config, index)
 
-  ESUtils.createIndex(client, index)
-  val mappingJson =
-    (estype ->
-      ("_all" -> ("enabled" -> 0))~
-      ("properties" ->
-        ("status" -> ("type" -> "keyword")) ~
-        ("startTime" -> ("type" -> "date")) ~
-        ("endTime" -> ("type" -> "date")) ~
-        ("evaluationClass" -> ("type" -> "keyword")) ~
-        ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
-        ("batch" -> ("type" -> "keyword")) ~
-        ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
-        ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
-        ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("status" -> ("type" -> "keyword")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("evaluationClass" -> ("type" -> "keyword")) ~
+          ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+          ("batch" -> ("type" -> "keyword")) ~
+          ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
+          ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
+          ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
 
   def insert(i: EvaluationInstance): String = {
     val id = i.id match {
@@ -74,8 +79,9 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
   }
 
   def get(id: String): Option[EvaluationInstance] = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -97,23 +103,29 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
+    } finally {
+      restClient.close()
     }
   }
 
   def getAll(): Seq[EvaluationInstance] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
   def getCompleted(): Seq[EvaluationInstance] = {
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
@@ -122,19 +134,22 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
             ("sort" ->
               ("startTime" ->
                 ("order" -> "desc")))
-      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
+    } finally {
+      restClient.close()
     }
   }
 
   def update(i: EvaluationInstance): Unit = {
     val id = i.id
+    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava,
@@ -150,12 +165,15 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
 
   def delete(id: String): Unit = {
+    val restClient = client.open()
     try {
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -169,6 +187,8 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index ef25204..b4f7dc5 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -40,7 +40,7 @@ import org.json4s.ext.JodaTimeSerializers
 import grizzled.slf4j.Logging
 import org.elasticsearch.client.ResponseException
 
-class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
+class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
     extends LEvents with Logging {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
   private val seq = new ESSequences(client, config, index)
@@ -56,41 +56,47 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
 
   override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
-    ESUtils.createIndex(client, index)
-    val json =
-      (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
-        ("properties" ->
-          ("name" -> ("type" -> "keyword")) ~
-          ("eventId" -> ("type" -> "keyword")) ~
-          ("event" -> ("type" -> "keyword")) ~
-          ("entityType" -> ("type" -> "keyword")) ~
-          ("entityId" -> ("type" -> "keyword")) ~
-          ("targetEntityType" -> ("type" -> "keyword")) ~
-          ("targetEntityId" -> ("type" -> "keyword")) ~
+    val restClient = client.open()
+    try {
+      ESUtils.createIndex(restClient, index)
+      val json =
+        (estype ->
+          ("_all" -> ("enabled" -> 0)) ~
           ("properties" ->
-            ("type" -> "nested") ~
+            ("name" -> ("type" -> "keyword")) ~
+            ("eventId" -> ("type" -> "keyword")) ~
+            ("event" -> ("type" -> "keyword")) ~
+            ("entityType" -> ("type" -> "keyword")) ~
+            ("entityId" -> ("type" -> "keyword")) ~
+            ("targetEntityType" -> ("type" -> "keyword")) ~
+            ("targetEntityId" -> ("type" -> "keyword")) ~
             ("properties" ->
-              ("fields" -> ("type" -> "nested") ~
-                ("properties" ->
-                  ("user" -> ("type" -> "long")) ~
-                  ("num" -> ("type" -> "long")))))) ~
-                  ("eventTime" -> ("type" -> "date")) ~
-                  ("tags" -> ("type" -> "keyword")) ~
-                  ("prId" -> ("type" -> "keyword")) ~
-                  ("creationTime" -> ("type" -> "date"))))
-    ESUtils.createMapping(client, index, estype, compact(render(json)))
+              ("type" -> "nested") ~
+              ("properties" ->
+                ("fields" -> ("type" -> "nested") ~
+                  ("properties" ->
+                    ("user" -> ("type" -> "long")) ~
+                    ("num" -> ("type" -> "long")))))) ~
+                    ("eventTime" -> ("type" -> "date")) ~
+                    ("tags" -> ("type" -> "keyword")) ~
+                    ("prId" -> ("type" -> "keyword")) ~
+                    ("creationTime" -> ("type" -> "date"))))
+      ESUtils.createMapping(restClient, index, estype, compact(render(json)))
+    } finally {
+      restClient.close()
+    }
     true
   }
 
   override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
+    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
       val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-      client.performRequest(
+      restClient.performRequest(
         "POST",
         s"/$index/$estype/_delete_by_query",
         Map.empty[String, String].asJava,
@@ -104,14 +110,13 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
       case e: Exception =>
         error(s"Failed to remove $index/$estype", e)
         false
+    } finally {
+      restClient.close()
     }
   }
 
   override def close(): Unit = {
-    try client.close() catch {
-      case e: Exception =>
-        error("Failed to close client.", e)
-    }
+    // nothing
   }
 
   override def futureInsert(
@@ -120,15 +125,16 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
     Future {
       val estype = getEsType(appId, channelId)
-      val id = event.eventId.getOrElse {
-        var roll = seq.genNext(seqName)
-        while (exists(estype, roll)) roll = seq.genNext(seqName)
-        roll.toString
-      }
-      val json = write(event.copy(eventId = Some(id)))
+      val restClient = client.open()
       try {
+        val id = event.eventId.getOrElse {
+          var roll = seq.genNext(seqName)
+          while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
+          roll.toString
+        }
+        val json = write(event.copy(eventId = Some(id)))
         val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
-        val response = client.performRequest(
+        val response = restClient.performRequest(
           "POST",
           s"/$index/$estype/$id",
           Map.empty[String, String].asJava,
@@ -144,15 +150,17 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
         }
       } catch {
         case e: IOException =>
-          error(s"Failed to update $index/$estype/$id: $json", e)
+          error(s"Failed to update $index/$estype/<id>", e)
           ""
+      } finally {
+        restClient.close()
       }
     }
   }
 
-  private def exists(estype: String, id: Int): Boolean = {
+  private def exists(restClient: RestClient, estype: String, id: Int): Boolean = {
     try {
-      client.performRequest(
+      restClient.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
@@ -179,13 +187,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val restClient = client.open()
       try {
         val json =
           ("query" ->
             ("term" ->
               ("eventId" -> eventId)))
         val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-        val response = client.performRequest(
+        val response = restClient.performRequest(
           "POST",
           s"/$index/$estype/_search",
           Map.empty[String, String].asJava,
@@ -202,6 +211,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
         case e: IOException =>
           error("Failed to access to /$index/$estype/_search", e)
           None
+      } finally {
+        restClient.close()
       }
     }
   }
@@ -212,13 +223,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val restClient = client.open()
       try {
         val json =
           ("query" ->
             ("term" ->
               ("eventId" -> eventId)))
         val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-        val response = client.performRequest(
+        val response = restClient.performRequest(
           "POST",
           s"/$index/$estype/_delete_by_query",
           Map.empty[String, String].asJava)
@@ -234,6 +246,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
         case e: IOException =>
           error(s"Failed to update $index/$estype:$eventId", e)
           false
+      } finally {
+        restClient.close()
       }
     }
   }
@@ -253,15 +267,18 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val restClient = client.open()
       try {
         val query = ESUtils.createEventQuery(
           startTime, untilTime, entityType, entityId,
           eventNames, targetEntityType, targetEntityId, None)
-        ESUtils.getAll[Event](client, index, estype, query).toIterator
+        ESUtils.getAll[Event](restClient, index, estype, query).toIterator
       } catch {
         case e: IOException =>
           error(e.getMessage)
           Iterator[Event]()
+      } finally {
+        restClient.close()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index 0e3eec8..5784b3f 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -41,16 +41,10 @@ import org.json4s.native.JsonMethods._
 import org.json4s.ext.JodaTimeSerializers
 
 
-class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
+class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
     extends PEvents {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
 
-  // client is not used.
-  try client.close() catch {
-    case e: Exception =>
-      logger.error("Failed to close client.", e)
-  }
-
   def getEsType(appId: Int, channelId: Option[Int] = None): String = {
     channelId.map { ch =>
       s"${appId}_${ch}"
@@ -114,7 +108,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
     eventIds: RDD[String],
     appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
     val estype = getEsType(appId, channelId)
-    val restClient = ESUtils.createRestClient(config)
+    val restClient = client.open()
     try {
       eventIds.foreachPartition { iter =>
         iter.foreach { eventId =>
@@ -124,7 +118,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
                 ("term" ->
                   ("eventId" -> eventId)))
             val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-            val response = client.performRequest(
+            val response = restClient.performRequest(
               "POST",
               s"/$index/$estype/_delete_by_query",
               Map.empty[String, String].asJava)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index c067f3a..4eb8cd7 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -35,20 +35,26 @@ import org.json4s.native.Serialization.write
 
 import grizzled.slf4j.Logging
 
-class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging {
+class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging {
   implicit val formats = DefaultFormats
   private val estype = "sequences"
 
-  ESUtils.createIndex(client, index)
-  val mappingJson =
-    (estype ->
-      ("_all" -> ("enabled" -> 0)))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
 
   def genNext(name: String): Int = {
+    val restClient = client.open()
     try {
       val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
-      val response = client.performRequest(
+      val response = restClient.performRequest(
         "POST",
         s"/$index/$estype/$name",
         Map.empty[String, String].asJava,
@@ -66,6 +72,8 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index: String
     } catch {
       case e: IOException =>
         throw new StorageClientException(s"Failed to update $index/$estype/$name", e)
+    } finally {
+      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 68e3f57..db841b6 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -151,15 +151,13 @@ object ESUtils {
        |}""".stripMargin
   }
 
-  def createRestClient(config: StorageClientConfig): RestClient = {
+  def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = {
     val hosts = config.properties.get("HOSTS").
       map(_.split(",").toSeq).getOrElse(Seq("localhost"))
     val ports = config.properties.get("PORTS").
       map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
     val schemes = config.properties.get("SCHEMES").
       map(_.split(",").toSeq).getOrElse(Seq("http"))
-    val httpHosts = (hosts, ports, schemes).zipped.map(
-      (h, p, s) => new HttpHost(h, p, s))
-    RestClient.builder(httpHosts: _*).build()
+    (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
index 912d467..647d180 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -17,22 +17,28 @@
 
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
+import org.apache.http.HttpHost
 import org.apache.predictionio.data.storage.BaseStorageClient
 import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.predictionio.data.storage.StorageClientException
-import java.net.InetAddress
 import org.elasticsearch.client.RestClient
-import org.apache.http.HttpHost
+
+import grizzled.slf4j.Logging
+
+case class ESClient(hosts: Seq[HttpHost]) {
+  def open(): RestClient = {
+    try {
+      RestClient.builder(hosts: _*).build()
+    } catch {
+      case e: Throwable =>
+        throw new StorageClientException(e.getMessage, e)
+    }
+  }
+}
 
 class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
     with Logging {
   override val prefix = "ES"
 
-  val client = try {
-    ESUtils.createRestClient(config)
-  } catch {
-    case e: Throwable =>
-      throw new StorageClientException(e.getMessage, e)
-  }
+  val client = ESClient(ESUtils.getHttpHosts(config))
 }