You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:55:25 UTC

[44/57] [abbrv] incubator-predictionio git commit: [PIO-106, PIO-114] Elasticsearch 5.x singleton client with authentication

[PIO-106,PIO-114] Elasticsearch 5.x singleton client with authentication

Closes #421


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bf84ede6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bf84ede6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bf84ede6

Branch: refs/heads/livedoc
Commit: bf84ede6fe475ec591e784eb453c6194befb8515
Parents: eb61358
Author: Mars Hall <ma...@heroku.com>
Authored: Tue Aug 29 11:38:34 2017 -0700
Committer: Mars Hall <ma...@heroku.com>
Committed: Tue Aug 29 11:38:34 2017 -0700

----------------------------------------------------------------------
 bin/compute-classpath.sh                        |  21 ++-
 conf/pio-env.sh.template                        |   3 +
 .../predictionio/workflow/BatchPredict.scala    | 140 ++++++++++---------
 .../workflow/CleanupFunctions.scala             |  65 +++++++++
 .../predictionio/workflow/CoreWorkflow.scala    |  66 +++++----
 .../predictionio/data/storage/Storage.scala     |   8 +-
 storage/elasticsearch/build.sbt                 |   8 +-
 .../storage/elasticsearch/ESAccessKeys.scala    |  55 +++-----
 .../data/storage/elasticsearch/ESApps.scala     |  55 +++-----
 .../data/storage/elasticsearch/ESChannels.scala |  50 +++----
 .../elasticsearch/ESEngineInstances.scala       |  84 ++++-------
 .../elasticsearch/ESEvaluationInstances.scala   |  71 ++++------
 .../data/storage/elasticsearch/ESLEvents.scala  |  49 +++++--
 .../data/storage/elasticsearch/ESPEvents.scala  |  29 ++--
 .../storage/elasticsearch/ESSequences.scala     |  30 ++--
 .../storage/elasticsearch/StorageClient.scala   |  69 ++++++++-
 .../elasticsearch/StorageClientSpec.scala       |  67 +++++++++
 .../elasticsearch/StorageTestUtils.scala        |  28 ++++
 storage/hdfs/project/build.properties           |   1 +
 .../tools/export/EventsToFile.scala             |  62 ++++----
 .../predictionio/tools/imprt/FileToEvents.scala |  64 +++++----
 21 files changed, 596 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/bin/compute-classpath.sh
----------------------------------------------------------------------
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 3e59ca7..7a38e0b 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -42,8 +42,25 @@ ASSEMBLY_JARS=$(printf "${MAIN_JAR}\n${DATA_JARS}" | paste -sd "," -)
 
 # Build up classpath
 CLASSPATH="${PIO_CONF_DIR}"
-CLASSPATH="$CLASSPATH:${FWDIR}/plugins/*"
-CLASSPATH="$CLASSPATH:${assembly_folder}/spark/*"
+
+# stable classpath for plugin JARs
+if [ -d "${FWDIR}/plugins" ]; then
+  lib_plugin_jars=`ls "${FWDIR}"/plugins/*`
+  lib_plugin_classpath=''
+  for J in $lib_plugin_jars; do
+    lib_plugin_classpath="${lib_plugin_classpath}:${J}"
+  done
+  CLASSPATH="$CLASSPATH${lib_plugin_classpath}"
+fi
+
+# stable classpath for Spark JARs
+lib_spark_jars=`ls "${assembly_folder}"/spark/*.jar`
+lib_spark_classpath=''
+for J in $lib_spark_jars; do
+  lib_spark_classpath="${lib_spark_classpath}:${J}"
+done
+CLASSPATH="$CLASSPATH${lib_spark_classpath}"
+
 CLASSPATH="$CLASSPATH:${MAIN_JAR}"
 
 # Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail ! Note, this

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index 832b422..0b6b5b9 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -90,6 +90,9 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.4.1
+# Optional basic HTTP auth
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret
 # Elasticsearch 1.x Example
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
index 5420638..69525b1 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
@@ -28,6 +28,7 @@ import org.apache.predictionio.controller.{Engine, Utils}
 import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
 import org.apache.predictionio.data.storage.{EngineInstance, Storage}
 import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.predictionio.workflow.CleanupFunctions
 import org.apache.spark.rdd.RDD
 import org.json4s._
 import org.json4s.native.JsonMethods._
@@ -146,84 +147,89 @@ object BatchPredict extends Logging {
     engineInstance: EngineInstance,
     engine: Engine[_, _, _, Q, P, _]): Unit = {
 
-    val engineParams = engine.engineInstanceToEngineParams(
-      engineInstance, config.jsonExtractor)
+    try {
+      val engineParams = engine.engineInstanceToEngineParams(
+        engineInstance, config.jsonExtractor)
 
-    val kryo = KryoInstantiator.newKryoInjection
+      val kryo = KryoInstantiator.newKryoInjection
 
-    val modelsFromEngineInstance =
-      kryo.invert(modeldata.get(engineInstance.id).get.models).get.
-      asInstanceOf[Seq[Any]]
+      val modelsFromEngineInstance =
+        kryo.invert(modeldata.get(engineInstance.id).get.models).get.
+        asInstanceOf[Seq[Any]]
 
-    val prepareSparkContext = WorkflowContext(
-      batch = engineInstance.engineFactory,
-      executorEnv = engineInstance.env,
-      mode = "Batch Predict (model)",
-      sparkEnv = engineInstance.sparkConf)
+      val prepareSparkContext = WorkflowContext(
+        batch = engineInstance.engineFactory,
+        executorEnv = engineInstance.env,
+        mode = "Batch Predict (model)",
+        sparkEnv = engineInstance.sparkConf)
 
-    val models = engine.prepareDeploy(
-      prepareSparkContext,
-      engineParams,
-      engineInstance.id,
-      modelsFromEngineInstance,
-      params = WorkflowParams()
-    )
+      val models = engine.prepareDeploy(
+        prepareSparkContext,
+        engineParams,
+        engineInstance.id,
+        modelsFromEngineInstance,
+        params = WorkflowParams()
+      )
 
-    val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
-      Doer(engine.algorithmClassMap(n), p)
-    }
+      val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
+        Doer(engine.algorithmClassMap(n), p)
+      }
 
-    val servingParamsWithName = engineParams.servingParams
+      val servingParamsWithName = engineParams.servingParams
 
-    val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
-      servingParamsWithName._2)
+      val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
+        servingParamsWithName._2)
 
-    val runSparkContext = WorkflowContext(
-      batch = engineInstance.engineFactory,
-      executorEnv = engineInstance.env,
-      mode = "Batch Predict (runner)",
-      sparkEnv = engineInstance.sparkConf)
+      val runSparkContext = WorkflowContext(
+        batch = engineInstance.engineFactory,
+        executorEnv = engineInstance.env,
+        mode = "Batch Predict (runner)",
+        sparkEnv = engineInstance.sparkConf)
 
-    val inputRDD: RDD[String] = runSparkContext.
-      textFile(config.inputFilePath).
-      filter(_.trim.nonEmpty)
-    val queriesRDD: RDD[String] = config.queryPartitions match {
-      case Some(p) => inputRDD.repartition(p)
-      case None => inputRDD
-    }
+      val inputRDD: RDD[String] = runSparkContext.
+        textFile(config.inputFilePath).
+        filter(_.trim.nonEmpty)
+      val queriesRDD: RDD[String] = config.queryPartitions match {
+        case Some(p) => inputRDD.repartition(p)
+        case None => inputRDD
+      }
 
-    val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
-      val jsonExtractorOption = config.jsonExtractor
-      // Extract Query from Json
-      val query = JsonExtractor.extract(
-        jsonExtractorOption,
-        queryString,
-        algorithms.head.queryClass,
-        algorithms.head.querySerializer,
-        algorithms.head.gsonTypeAdapterFactories
-      )
-      // Deploy logic. First call Serving.supplement, then Algo.predict,
-      // finally Serving.serve.
-      val supplementedQuery = serving.supplementBase(query)
-      // TODO: Parallelize the following.
-      val predictions = algorithms.zip(models).map { case (a, m) =>
-        a.predictBase(m, supplementedQuery)
+      val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
+        val jsonExtractorOption = config.jsonExtractor
+        // Extract Query from Json
+        val query = JsonExtractor.extract(
+          jsonExtractorOption,
+          queryString,
+          algorithms.head.queryClass,
+          algorithms.head.querySerializer,
+          algorithms.head.gsonTypeAdapterFactories
+        )
+        // Deploy logic. First call Serving.supplement, then Algo.predict,
+        // finally Serving.serve.
+        val supplementedQuery = serving.supplementBase(query)
+        // TODO: Parallelize the following.
+        val predictions = algorithms.zip(models).map { case (a, m) =>
+          a.predictBase(m, supplementedQuery)
+        }
+        // Notice that it is by design to call Serving.serve with the
+        // *original* query.
+        val prediction = serving.serveBase(query, predictions)
+        // Combine query with prediction, so the batch results are
+        // self-descriptive.
+        val predictionJValue = JsonExtractor.toJValue(
+          jsonExtractorOption,
+          Map("query" -> query,
+              "prediction" -> prediction),
+          algorithms.head.querySerializer,
+          algorithms.head.gsonTypeAdapterFactories)
+        // Return JSON string
+        compact(render(predictionJValue))
       }
-      // Notice that it is by design to call Serving.serve with the
-      // *original* query.
-      val prediction = serving.serveBase(query, predictions)
-      // Combine query with prediction, so the batch results are
-      // self-descriptive.
-      val predictionJValue = JsonExtractor.toJValue(
-        jsonExtractorOption,
-        Map("query" -> query,
-            "prediction" -> prediction),
-        algorithms.head.querySerializer,
-        algorithms.head.gsonTypeAdapterFactories)
-      // Return JSON string
-      compact(render(predictionJValue))
-    }
 
-    predictionsRDD.saveAsTextFile(config.outputFilePath)
+      predictionsRDD.saveAsTextFile(config.outputFilePath)
+
+    } finally {
+      CleanupFunctions.run()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala b/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala
new file mode 100644
index 0000000..bdd8b01
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.workflow
+
+/** :: DeveloperApi ::
+  * Singleton object that collects anonymous functions to be
+  * executed to allow the process to end gracefully.
+  *
+  * For example, the Elasticsearch REST storage client
+  * maintains an internal connection pool that must
+  * be closed to allow the process to exit.
+  */
+object CleanupFunctions {
+  @volatile private var functions: Seq[() => Unit] = Seq.empty[() => Unit]
+
+  /** Add a function to be called during cleanup.
+    *
+    * {{{
+    * import org.apache.predictionio.workflow.CleanupFunctions
+    *
+    * CleanupFunctions.add { MyStorageClass.close }
+    * }}}
+    *
+    * @param anonymous function containing cleanup code.
+    */
+  def add(f: () => Unit): Seq[() => Unit] = {
+    functions = functions :+ f
+    functions
+  }
+
+  /** Call all cleanup functions in order added.
+    *
+    * {{{
+    * import org.apache.predictionio.workflow.CleanupFunctions
+    *
+    * try {
+    *   // Much code that needs cleanup
+    *   // whether successful or error thrown.
+    * } finally {
+    *   CleanupFunctions.run()
+    * }
+    * }}}
+    *
+    * @param anonymous function containing cleanup code.
+    */
+  def run(): Unit = {
+    functions.foreach { f => f() }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
index d956fd7..65270a2 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
@@ -96,6 +96,7 @@ object CoreWorkflow {
       }
     } finally {
       logger.debug("Stopping SparkContext")
+      CleanupFunctions.run()
       sc.stop()
     }
   }
@@ -123,40 +124,43 @@ object CoreWorkflow {
       env,
       params.sparkEnv,
       mode.capitalize)
-    val evaluationInstanceId = evaluationInstances.insert(evaluationInstance)
 
-    logger.info(s"Starting evaluation instance ID: $evaluationInstanceId")
-
-    val evaluatorResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation(
-      sc,
-      evaluation,
-      engine,
-      engineParamsList,
-      evaluator,
-      params)
-
-    if (evaluatorResult.noSave) {
-      logger.info(s"This evaluation result is not inserted into database: $evaluatorResult")
-    } else {
-      val evaluatedEvaluationInstance = evaluationInstance.copy(
-        status = "EVALCOMPLETED",
-        id = evaluationInstanceId,
-        endTime = DateTime.now,
-        evaluatorResults = evaluatorResult.toOneLiner,
-        evaluatorResultsHTML = evaluatorResult.toHTML,
-        evaluatorResultsJSON = evaluatorResult.toJSON
-      )
-
-      logger.info(s"Updating evaluation instance with result: $evaluatorResult")
+    try {
+      val evaluationInstanceId = evaluationInstances.insert(evaluationInstance)
+
+      logger.info(s"Starting evaluation instance ID: $evaluationInstanceId")
+
+      val evaluatorResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation(
+        sc,
+        evaluation,
+        engine,
+        engineParamsList,
+        evaluator,
+        params)
+
+      if (evaluatorResult.noSave) {
+        logger.info(s"This evaluation result is not inserted into database: $evaluatorResult")
+      } else {
+        val evaluatedEvaluationInstance = evaluationInstance.copy(
+          status = "EVALCOMPLETED",
+          id = evaluationInstanceId,
+          endTime = DateTime.now,
+          evaluatorResults = evaluatorResult.toOneLiner,
+          evaluatorResultsHTML = evaluatorResult.toHTML,
+          evaluatorResultsJSON = evaluatorResult.toJSON
+        )
+
+        logger.info(s"Updating evaluation instance with result: $evaluatorResult")
+
+        evaluationInstances.update(evaluatedEvaluationInstance)
+      }
+      logger.info("runEvaluation completed")
 
-      evaluationInstances.update(evaluatedEvaluationInstance)
+    } finally {
+      logger.debug("Stop SparkContext")
+      CleanupFunctions.run()
+      sc.stop()
     }
-
-    logger.debug("Stop SparkContext")
-
-    sc.stop()
-
-    logger.info("runEvaluation completed")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
index 52442a6..70e1973 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
@@ -19,6 +19,7 @@
 package org.apache.predictionio.data.storage
 
 import grizzled.slf4j.Logging
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.predictionio.annotation.DeveloperApi
 
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -214,7 +215,8 @@ object Storage extends Logging {
         }
       } catch {
         case e: Throwable =>
-          error(e.getMessage)
+          val stackTrace = ExceptionUtils.getStackTrace(e)
+          error(s"${e.getMessage}\n${stackTrace}\n\n")
           errors += 1
           r -> DataObjectMeta("", "")
       }
@@ -282,7 +284,9 @@ object Storage extends Logging {
       Some(ClientMeta(sourceType, client, clientConfig))
     } catch {
       case e: Throwable =>
-        error(s"Error initializing storage client for source ${k}", e)
+        val stackTrace = ExceptionUtils.getStackTrace(e)
+        error(s"Error initializing storage client for source ${k}.\n" +
+          s"${stackTrace}\n\n")
         errors += 1
         None
     }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/build.sbt
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt
index da4842e..b60e86e 100644
--- a/storage/elasticsearch/build.sbt
+++ b/storage/elasticsearch/build.sbt
@@ -30,7 +30,7 @@ libraryDependencies ++= Seq(
   "org.elasticsearch"       %% elasticsearchSparkArtifact.value % elasticsearchVersion.value
     exclude("org.apache.spark", "*"),
   "org.elasticsearch"        % "elasticsearch-hadoop-mr"  % elasticsearchVersion.value,
-  "org.scalatest"           %% "scalatest"                % "2.1.7" % "test")
+  "org.specs2"              %% "specs2"                   % "2.3.13" % "test")
 
 parallelExecution in Test := false
 
@@ -39,8 +39,10 @@ pomExtra := childrenPomExtra.value
 assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
 
 assemblyShadeRules in assembly := Seq(
-  ShadeRule.rename("org.apache.http.**" -> "shadeio.data.http.@1").inAll
-)
+  ShadeRule.rename("org.apache.http.**" ->
+    "org.apache.predictionio.shaded.org.apache.http.@1").inAll,
+  ShadeRule.rename("org.elasticsearch.client.**" ->
+    "org.apache.predictionio.shaded.org.elasticsearch.client.@1").inAll)
 
 // skip test in assembly
 test in assembly := {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 98c2781..73ef1d0 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -27,36 +27,30 @@ import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.AccessKey
 import org.apache.predictionio.data.storage.AccessKeys
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
 
 import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
 
 /** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
+class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String)
     extends AccessKeys with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "accesskeys"
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
-    val mappingJson =
-      (estype ->
-        ("_all" -> ("enabled" -> false)) ~
-        ("properties" ->
-          ("key" -> ("type" -> "keyword")) ~
-          ("events" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
-  } finally {
-    restClient.close()
-  }
+  ESUtils.createIndex(client, index,
+    ESUtils.getNumberOfShards(config, index.toUpperCase),
+    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> false)) ~
+      ("properties" ->
+        ("key" -> ("type" -> "keyword")) ~
+        ("events" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(accessKey: AccessKey): Option[String] = {
     val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
@@ -68,9 +62,8 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
     if (id.isEmpty) {
       return None
     }
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -92,50 +85,41 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getAll(): Seq[AccessKey] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
   def getByAppid(appid: Int): Seq[AccessKey] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("term" ->
             ("appid" -> appid)))
-      ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
   def update(accessKey: AccessKey): Unit = {
     val id = accessKey.key
-    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava,
@@ -151,15 +135,12 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
     }
   }
 
   def delete(id: String): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava)
@@ -173,8 +154,6 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/id", e)
-    } finally {
-      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 6afed12..ba48065 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -27,37 +27,31 @@ import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.App
 import org.apache.predictionio.data.storage.Apps
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
 
 import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
 
 /** Elasticsearch implementation of Items. */
-class ESApps(client: ESClient, config: StorageClientConfig, index: String)
+class ESApps(client: RestClient, config: StorageClientConfig, index: String)
     extends Apps with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "apps"
   private val seq = new ESSequences(client, config, index)
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
-    val mappingJson =
-      (estype ->
-        ("_all" -> ("enabled" -> false)) ~
-        ("properties" ->
-          ("id" -> ("type" -> "keyword")) ~
-          ("name" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
-  } finally {
-    restClient.close()
-  }
+  ESUtils.createIndex(client, index,
+    ESUtils.getNumberOfShards(config, index.toUpperCase),
+    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> false)) ~
+      ("properties" ->
+        ("id" -> ("type" -> "keyword")) ~
+        ("name" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(app: App): Option[Int] = {
     val id = app.id match {
@@ -77,9 +71,8 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
   }
 
   def get(id: Int): Option[App] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -101,20 +94,17 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getByName(name: String): Option[App] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("term" ->
             ("name" -> name)))
       val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/_search",
         Map.empty[String, String].asJava,
@@ -131,33 +121,27 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/_search", e)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getAll(): Seq[App] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> Nil))
-      ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[App](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
   def update(app: App): Unit = {
     val id = app.id.toString
-    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava,
@@ -173,15 +157,12 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
     }
   }
 
   def delete(id: Int): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava)
@@ -195,8 +176,6 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/id", e)
-    } finally {
-      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index c142beb..b5eb5c8 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -27,35 +27,29 @@ import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.Channel
 import org.apache.predictionio.data.storage.Channels
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
 
 import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
 
-class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
+class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
     extends Channels with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "channels"
   private val seq = new ESSequences(client, config, index)
-
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
-    val mappingJson =
-      (estype ->
-        ("_all" -> ("enabled" -> false)) ~
-        ("properties" ->
-          ("name" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
-  } finally {
-    restClient.close()
-  }
+  
+  ESUtils.createIndex(client, index,
+    ESUtils.getNumberOfShards(config, index.toUpperCase),
+    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> false)) ~
+      ("properties" ->
+        ("name" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(channel: Channel): Option[Int] = {
     val id = channel.id match {
@@ -75,9 +69,8 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
   }
 
   def get(id: Int): Option[Channel] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -99,34 +92,28 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getByAppid(appid: Int): Seq[Channel] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("term" ->
             ("appid" -> appid)))
-      ESUtils.getAll[Channel](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[Channel](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
   def update(channel: Channel): Boolean = {
     val id = channel.id.toString
-    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava,
@@ -144,15 +131,12 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
         false
-    } finally {
-      restClient.close()
     }
   }
 
   def delete(id: Int): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava)
@@ -166,8 +150,6 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index de474c1..eec5b64 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -28,46 +28,40 @@ import org.apache.predictionio.data.storage.EngineInstance
 import org.apache.predictionio.data.storage.EngineInstanceSerializer
 import org.apache.predictionio.data.storage.EngineInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
 
 import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
 
-class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String)
+class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String)
     extends EngineInstances with Logging {
   implicit val formats = DefaultFormats + new EngineInstanceSerializer
   private val estype = "engine_instances"
-
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
-    val mappingJson =
-      (estype ->
-        ("_all" -> ("enabled" -> false)) ~
-        ("properties" ->
-          ("status" -> ("type" -> "keyword")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("engineId" -> ("type" -> "keyword")) ~
-          ("engineVersion" -> ("type" -> "keyword")) ~
-          ("engineVariant" -> ("type" -> "keyword")) ~
-          ("engineFactory" -> ("type" -> "keyword")) ~
-          ("batch" -> ("type" -> "keyword")) ~
-          ("dataSourceParams" -> ("type" -> "keyword")) ~
-          ("preparatorParams" -> ("type" -> "keyword")) ~
-          ("algorithmsParams" -> ("type" -> "keyword")) ~
-          ("servingParams" -> ("type" -> "keyword")) ~
-          ("status" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
-  } finally {
-    restClient.close()
-  }
+  
+  ESUtils.createIndex(client, index,
+    ESUtils.getNumberOfShards(config, index.toUpperCase),
+    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> false)) ~
+      ("properties" ->
+        ("status" -> ("type" -> "keyword")) ~
+        ("startTime" -> ("type" -> "date")) ~
+        ("endTime" -> ("type" -> "date")) ~
+        ("engineId" -> ("type" -> "keyword")) ~
+        ("engineVersion" -> ("type" -> "keyword")) ~
+        ("engineVariant" -> ("type" -> "keyword")) ~
+        ("engineFactory" -> ("type" -> "keyword")) ~
+        ("batch" -> ("type" -> "keyword")) ~
+        ("dataSourceParams" -> ("type" -> "keyword")) ~
+        ("preparatorParams" -> ("type" -> "keyword")) ~
+        ("algorithmsParams" -> ("type" -> "keyword")) ~
+        ("servingParams" -> ("type" -> "keyword")) ~
+        ("status" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(i: EngineInstance): String = {
     val id = i.id match {
@@ -88,10 +82,9 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
   }
 
   def preInsert(): Option[String] = {
-    val restClient = client.open()
     try {
       val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/",
         Map("refresh" -> "true").asJava,
@@ -109,15 +102,12 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
       case e: IOException =>
         error(s"Failed to create $index/$estype", e)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def get(id: String): Option[EngineInstance] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -139,24 +129,19 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getAll(): Seq[EngineInstance] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
@@ -164,7 +149,6 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
     engineId: String,
     engineVersion: String,
     engineVariant: String): Seq[EngineInstance] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
@@ -181,13 +165,11 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
               ("sort" -> List(
                 ("startTime" ->
                   ("order" -> "desc"))))
-      ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
@@ -202,10 +184,9 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
 
   def update(i: EngineInstance): Unit = {
     val id = i.id
-    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava,
@@ -221,15 +202,12 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
     }
   }
 
   def delete(id: String): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava)
@@ -243,8 +221,6 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 9b19cf4..1706583 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -29,43 +29,37 @@ import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
 import org.apache.predictionio.data.storage.EvaluationInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
 
 import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
 
-class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String)
+class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String)
     extends EvaluationInstances with Logging {
   implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
   private val estype = "evaluation_instances"
   private val seq = new ESSequences(client, config, index)
-
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
-    val mappingJson =
-      (estype ->
-        ("_all" -> ("enabled" -> false)) ~
-        ("properties" ->
-          ("status" -> ("type" -> "keyword")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("evaluationClass" -> ("type" -> "keyword")) ~
-          ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
-          ("batch" -> ("type" -> "keyword")) ~
-          ("evaluatorResults" -> ("type" -> "text")) ~
-          ("evaluatorResultsHTML" -> ("enabled" -> false)) ~
-          ("evaluatorResultsJSON" -> ("enabled" -> false))))
-    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
-  } finally {
-    restClient.close()
-  }
+  
+  ESUtils.createIndex(client, index,
+    ESUtils.getNumberOfShards(config, index.toUpperCase),
+    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> false)) ~
+      ("properties" ->
+        ("status" -> ("type" -> "keyword")) ~
+        ("startTime" -> ("type" -> "date")) ~
+        ("endTime" -> ("type" -> "date")) ~
+        ("evaluationClass" -> ("type" -> "keyword")) ~
+        ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+        ("batch" -> ("type" -> "keyword")) ~
+        ("evaluatorResults" -> ("type" -> "text")) ~
+        ("evaluatorResultsHTML" -> ("enabled" -> false)) ~
+        ("evaluatorResultsJSON" -> ("enabled" -> false))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(i: EvaluationInstance): String = {
     val id = i.id match {
@@ -85,9 +79,8 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
   }
 
   def get(id: String): Option[EvaluationInstance] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "GET",
         s"/$index/$estype/$id",
         Map.empty[String, String].asJava)
@@ -109,29 +102,23 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
       case e: IOException =>
         error(s"Failed to access to /$index/$estype/$id", e)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getAll(): Seq[EvaluationInstance] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
   def getCompleted(): Seq[EvaluationInstance] = {
-    val restClient = client.open()
     try {
       val json =
         ("query" ->
@@ -140,22 +127,19 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
             ("sort" ->
               ("startTime" ->
                 ("order" -> "desc")))
-      ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
+      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>
         error("Failed to access to /$index/$estype/_search", e)
         Nil
-    } finally {
-      restClient.close()
     }
   }
 
   def update(i: EvaluationInstance): Unit = {
     val id = i.id
-    val restClient = client.open()
     try {
       val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava,
@@ -171,15 +155,12 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
     }
   }
 
   def delete(id: String): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "DELETE",
         s"/$index/$estype/$id",
         Map("refresh" -> "true").asJava)
@@ -193,8 +174,6 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
     } catch {
       case e: IOException =>
         error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 6c0c4a7..5e1f4c1 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -28,6 +28,7 @@ import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.Event
 import org.apache.predictionio.data.storage.LEvents
 import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.{ResponseException, RestClient}
 import org.joda.time.DateTime
 import org.json4s._
 import org.json4s.JsonDSL._
@@ -37,10 +38,9 @@ import org.json4s.ext.JodaTimeSerializers
 import grizzled.slf4j.Logging
 import org.apache.http.message.BasicHeader
 
-class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
+class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
     extends LEvents with Logging {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
-  val restClient = client.open()
 
   def getEsType(appId: Int, channelId: Option[Int] = None): String = {
     channelId.map { ch =>
@@ -52,7 +52,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
 
   override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
-    ESUtils.createIndex(restClient, index,
+    ESUtils.createIndex(client, index,
       ESUtils.getNumberOfShards(config, index.toUpperCase),
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val json =
@@ -71,7 +71,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
           ("tags" -> ("type" -> "keyword")) ~
           ("prId" -> ("type" -> "keyword")) ~
           ("creationTime" -> ("type" -> "date"))))
-    ESUtils.createMapping(restClient, index, estype, compact(render(json)))
+    ESUtils.createMapping(client, index, estype, compact(render(json)))
     true
   }
 
@@ -82,7 +82,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
         ("query" ->
           ("match_all" -> List.empty))
       val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-      restClient.performRequest(
+      client.performRequest(
         "POST",
         s"/$index/$estype/_delete_by_query",
         Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -99,9 +99,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     }
   }
 
-  override def close(): Unit = {
-    restClient.close()
-  }
+  override def close(): Unit = {}
 
   override def futureInsert(
     event: Event,
@@ -126,7 +124,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
           ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
           ("properties" -> write(event.properties.toJObject))
         val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-        val response = restClient.performRequest(
+        val response = client.performRequest(
           "POST",
           s"/$index/$estype/$id",
           Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -185,7 +183,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
         }.mkString("", "\n", "\n")
 
         val entity = new StringEntity(json)
-        val response = restClient.performRequest(
+        val response = client.performRequest(
           "POST",
           "/_bulk",
           Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -215,6 +213,29 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     }
   }
 
+  private def exists(client: RestClient, estype: String, id: Int): Boolean = {
+    try {
+      client.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+          case 200 => true
+          case _ => false
+        }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => false
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            false
+        }
+      case e: IOException =>
+        error(s"Failed to access to $index/$estype/$id", e)
+        false
+    }
+  }
+
   override def futureGet(
     eventId: String,
     appId: Int,
@@ -227,7 +248,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
             ("term" ->
               ("eventId" -> eventId)))
         val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-        val response = restClient.performRequest(
+        val response = client.performRequest(
           "POST",
           s"/$index/$estype/_search",
           Map.empty[String, String].asJava,
@@ -260,7 +281,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
             ("term" ->
               ("eventId" -> eventId)))
         val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-        val response = restClient.performRequest(
+        val response = client.performRequest(
           "POST",
           s"/$index/$estype/_delete_by_query",
           Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -301,8 +322,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
           startTime, untilTime, entityType, entityId,
           eventNames, targetEntityType, targetEntityId, reversed)
         limit.getOrElse(20) match {
-          case -1 => ESUtils.getEventAll(restClient, index, estype, query).toIterator
-          case size => ESUtils.getEvents(restClient, index, estype, query, size).toIterator
+          case -1 => ESUtils.getEventAll(client, index, estype, query).toIterator
+          case size => ESUtils.getEvents(client, index, estype, query, size).toIterator
         }
       } catch {
         case e: IOException =>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index 9f0a188..75f7639 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -41,7 +41,7 @@ import org.json4s.native.JsonMethods._
 import org.json4s.ext.JodaTimeSerializers
 
 
-class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
+class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
     extends PEvents {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
 
@@ -107,8 +107,6 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
     eventIds: RDD[String],
     appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
     val estype = getEsType(appId, channelId)
-    val restClient = client.open()
-    try {
       eventIds.foreachPartition { iter =>
         iter.foreach { eventId =>
           try {
@@ -117,28 +115,23 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
                 ("term" ->
                   ("eventId" -> eventId)))
             val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
-            val response = restClient.performRequest(
+            val response = client.performRequest(
               "POST",
               s"/$index/$estype/_delete_by_query",
               Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
               entity)
-            val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-            val result = (jsonResponse \ "result").extract[String]
-            result match {
-              case "deleted" => true
-              case _ =>
-                logger.error(s"[$result] Failed to update $index/$estype:$eventId")
-                false
-            }
-          } catch {
-            case e: IOException =>
-              logger.error(s"Failed to update $index/$estype:$eventId", e)
-              false
+          val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+          val result = (jsonResponse \ "result").extract[String]
+          result match {
+            case "deleted" =>
+            case _ =>
+              logger.error(s"[$result] Failed to update $index/$estype:$eventId")
           }
+        } catch {
+          case e: IOException =>
+            logger.error(s"Failed to update $index/$estype:$eventId", e)
         }
       }
-    } finally {
-      restClient.close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index 9fd31a3..018ef85 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -35,30 +35,24 @@ import org.json4s.native.Serialization.write
 
 import grizzled.slf4j.Logging
 
-class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging {
+class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging {
   implicit val formats = DefaultFormats
   private val estype = "sequences"
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
-    val mappingJson =
-      (estype ->
-        ("_all" -> ("enabled" -> false)) ~
-        ("properties" ->
-          ("n" -> ("enabled" -> false))))
-    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
-  } finally {
-    restClient.close()
-  }
+  ESUtils.createIndex(client, index,
+    ESUtils.getNumberOfShards(config, index.toUpperCase),
+    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> false)) ~
+      ("properties" ->
+        ("n" -> ("enabled" -> false))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def genNext(name: String): Long = {
-    val restClient = client.open()
     try {
       val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
+      val response = client.performRequest(
         "POST",
         s"/$index/$estype/$name",
         Map("refresh" -> "false").asJava,
@@ -76,8 +70,6 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String)
     } catch {
       case e: IOException =>
         throw new StorageClientException(s"Failed to update $index/$estype/$name", e)
-    } finally {
-      restClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
index 647d180..d2c69b9 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -18,27 +18,84 @@
 package org.apache.predictionio.data.storage.elasticsearch
 
 import org.apache.http.HttpHost
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
 import org.apache.predictionio.data.storage.BaseStorageClient
 import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.predictionio.data.storage.StorageClientException
+import org.apache.predictionio.workflow.CleanupFunctions
 import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
 
 import grizzled.slf4j.Logging
 
-case class ESClient(hosts: Seq[HttpHost]) {
-  def open(): RestClient = {
+object ESClient extends Logging {
+  private var _sharedRestClient: Option[RestClient] = None
+
+  def open(
+    hosts: Seq[HttpHost],
+    basicAuth: Option[(String, String)] = None): RestClient = {
     try {
-      RestClient.builder(hosts: _*).build()
+      val newClient = _sharedRestClient match {
+        case Some(c)  => c
+        case None     => {
+          var builder = RestClient.builder(hosts: _*)
+          builder = basicAuth match {
+            case Some((username, password)) => builder.setHttpClientConfigCallback(
+              new BasicAuthProvider(username, password))
+            case None                       => builder}
+          builder.build()
+        }
+      }
+      _sharedRestClient = Some(newClient)
+      newClient
     } catch {
       case e: Throwable =>
         throw new StorageClientException(e.getMessage, e)
     }
   }
+
+  def close(): Unit = {
+    _sharedRestClient.foreach { client =>
+      client.close()
+      _sharedRestClient = None
+    }
+  }
 }
 
-class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
-    with Logging {
+class StorageClient(val config: StorageClientConfig)
+  extends BaseStorageClient with Logging {
+
   override val prefix = "ES"
 
-  val client = ESClient(ESUtils.getHttpHosts(config))
+  val usernamePassword = (
+    config.properties.get("USERNAME"),
+    config.properties.get("PASSWORD"))
+  val optionalBasicAuth: Option[(String, String)] = usernamePassword match {
+    case (None, None)         => None
+    case (username, password) => Some(
+      (username.getOrElse(""), password.getOrElse("")))
+  }
+
+  CleanupFunctions.add { ESClient.close }
+
+  val client = ESClient.open(ESUtils.getHttpHosts(config), optionalBasicAuth)
+}
+
+class BasicAuthProvider(
+    val username: String,
+    val password: String)
+  extends HttpClientConfigCallback {
+
+  val credentialsProvider = new BasicCredentialsProvider()
+  credentialsProvider.setCredentials(
+    AuthScope.ANY,
+    new UsernamePasswordCredentials(username, password))
+
+  override def customizeHttpClient(
+    httpClientBuilder: HttpAsyncClientBuilder
+  ): HttpAsyncClientBuilder = {
+    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala
new file mode 100644
index 0000000..91cc4d6
--- /dev/null
+++ b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.apache.predictionio.data.storage.{App, Apps, Storage, StorageClientConfig}
+import org.elasticsearch.client.{RestClient, Response}
+import scala.collection.JavaConverters._
+
+import org.specs2._
+import org.specs2.specification.Step
+
+class ElasticsearchStorageClientSpec extends Specification {
+  def is = s2"""
+
+  PredictionIO Storage Elasticsearch REST Client Specification ${getESClient}
+
+  """
+
+  def getESClient = sequential ^ s2"""
+
+    StorageClient should
+    - initialize metadata store ${initMetadataStore(appsDO)}
+
+  """
+
+  def initMetadataStore(appsDO: Apps) = sequential ^ s2"""
+
+    creates an app ${createsApp(appsDO)}
+    gets apps ${getApps(appsDO)}
+
+  """
+
+  val indexName = "test_pio_storage_meta_" + hashCode
+
+  def appsDO: Apps = Storage.getDataObject[Apps](StorageTestUtils.elasticsearchSourceName, indexName)
+
+  def createsApp(appsDO: Apps) = {
+    val newId: Int = 123
+    val newApp: App = App(newId, "test1", Some("App for ElasticsearchStorageClientSpec"))
+    val id: Option[Int] = appsDO.insert(newApp)
+    val createdApp: Option[App] = appsDO.get(id.get)
+    createdApp.get.id mustEqual newId
+  }
+
+  def getApps(appsDO: Apps) = {
+    val apps: Seq[App] = appsDO.getAll()
+    println(s"Storage.config ${Storage.config}")
+    println(s"getApps ${apps}")
+    apps must beAnInstanceOf[Seq[App]]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala
new file mode 100644
index 0000000..f891c58
--- /dev/null
+++ b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+object StorageTestUtils {
+  val elasticsearchSourceName = "ELASTICSEARCH"
+
+  def dropESIndex(namespace: String): Unit = {
+    // TODO
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/hdfs/project/build.properties
----------------------------------------------------------------------
diff --git a/storage/hdfs/project/build.properties b/storage/hdfs/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/storage/hdfs/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
index c101d3f..0372a44 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
@@ -25,6 +25,7 @@ import org.apache.predictionio.data.SparkVersionDependent
 import org.apache.predictionio.tools.Runner
 import org.apache.predictionio.workflow.WorkflowContext
 import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.workflow.CleanupFunctions
 
 import grizzled.slf4j.Logging
 import org.apache.spark.sql.SaveMode
@@ -69,40 +70,45 @@ object EventsToFile extends Logging {
       }
     }
     parser.parse(args, EventsToFileArgs()) map { args =>
-      // get channelId
-      val channels = Storage.getMetaDataChannels
-      val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
+      try {
+        // get channelId
+        val channels = Storage.getMetaDataChannels
+        val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
 
-      val channelId: Option[Int] = args.channel.map { ch =>
-        if (!channelMap.contains(ch)) {
-          error(s"Channel ${ch} doesn't exist in this app.")
-          sys.exit(1)
+        val channelId: Option[Int] = args.channel.map { ch =>
+          if (!channelMap.contains(ch)) {
+            error(s"Channel ${ch} doesn't exist in this app.")
+            sys.exit(1)
+          }
+
+          channelMap(ch)
         }
 
-        channelMap(ch)
-      }
+        val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
 
-      val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
+        WorkflowUtils.modifyLogging(verbose = args.verbose)
+        @transient lazy implicit val formats = Utils.json4sDefaultFormats +
+          new EventJson4sSupport.APISerializer
+        val sc = WorkflowContext(
+          mode = "Export",
+          batch = "App ID " + args.appId + channelStr,
+          executorEnv = Runner.envStringToMap(args.env))
+        val sqlSession = SparkVersionDependent.sqlSession(sc)
+        val events = Storage.getPEvents()
+        val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
+        val jsonStringRdd = eventsRdd.map(write(_))
+        if (args.format == "json") {
+          jsonStringRdd.saveAsTextFile(args.outputPath)
+        } else {
+          val jsonDf = sqlSession.read.json(jsonStringRdd)
+          jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath)
+        }
+        info(s"Events are exported to ${args.outputPath}/.")
+        info("Done.")
 
-      WorkflowUtils.modifyLogging(verbose = args.verbose)
-      @transient lazy implicit val formats = Utils.json4sDefaultFormats +
-        new EventJson4sSupport.APISerializer
-      val sc = WorkflowContext(
-        mode = "Export",
-        batch = "App ID " + args.appId + channelStr,
-        executorEnv = Runner.envStringToMap(args.env))
-      val sqlSession = SparkVersionDependent.sqlSession(sc)
-      val events = Storage.getPEvents()
-      val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
-      val jsonStringRdd = eventsRdd.map(write(_))
-      if (args.format == "json") {
-        jsonStringRdd.saveAsTextFile(args.outputPath)
-      } else {
-        val jsonDf = sqlSession.read.json(jsonStringRdd)
-        jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath)
+      } finally {
+        CleanupFunctions.run()
       }
-      info(s"Events are exported to ${args.outputPath}/.")
-      info("Done.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
index 4b333ab..11f5a52 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
@@ -25,6 +25,7 @@ import org.apache.predictionio.data.storage.Storage
 import org.apache.predictionio.tools.Runner
 import org.apache.predictionio.workflow.WorkflowContext
 import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.workflow.CleanupFunctions
 
 import grizzled.slf4j.Logging
 import org.json4s.native.Serialization._
@@ -66,41 +67,46 @@ object FileToEvents extends Logging {
       }
     }
     parser.parse(args, FileToEventsArgs()) map { args =>
-      // get channelId
-      val channels = Storage.getMetaDataChannels
-      val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
+      try {
+        // get channelId
+        val channels = Storage.getMetaDataChannels
+        val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
 
-      val channelId: Option[Int] = args.channel.map { ch =>
-        if (!channelMap.contains(ch)) {
-          error(s"Channel ${ch} doesn't exist in this app.")
-          sys.exit(1)
+        val channelId: Option[Int] = args.channel.map { ch =>
+          if (!channelMap.contains(ch)) {
+            error(s"Channel ${ch} doesn't exist in this app.")
+            sys.exit(1)
+          }
+
+          channelMap(ch)
         }
 
-        channelMap(ch)
-      }
+        val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
 
-      val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
+        WorkflowUtils.modifyLogging(verbose = args.verbose)
+        @transient lazy implicit val formats = Utils.json4sDefaultFormats +
+          new EventJson4sSupport.APISerializer
+        val sc = WorkflowContext(
+          mode = "Import",
+          batch = "App ID " + args.appId + channelStr,
+          executorEnv = Runner.envStringToMap(args.env))
+        val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json =>
+          Try(read[Event](json)).recoverWith {
+            case e: Throwable =>
+              error(s"\nmalformed json => $json")
+              Failure(e)
+          }.get
+        }
+        val events = Storage.getPEvents()
+        events.write(events = rdd,
+          appId = args.appId,
+          channelId = channelId)(sc)
+        info("Events are imported.")
+        info("Done.")
 
-      WorkflowUtils.modifyLogging(verbose = args.verbose)
-      @transient lazy implicit val formats = Utils.json4sDefaultFormats +
-        new EventJson4sSupport.APISerializer
-      val sc = WorkflowContext(
-        mode = "Import",
-        batch = "App ID " + args.appId + channelStr,
-        executorEnv = Runner.envStringToMap(args.env))
-      val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json =>
-        Try(read[Event](json)).recoverWith {
-          case e: Throwable =>
-            error(s"\nmalformed json => $json")
-            Failure(e)
-        }.get
+      } finally {
+        CleanupFunctions.run()
       }
-      val events = Storage.getPEvents()
-      events.write(events = rdd,
-        appId = args.appId,
-        channelId = channelId)(sc)
-      info("Events are imported.")
-      info("Done.")
     }
   }
 }