You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/02/11 22:00:09 UTC

[02/10] incubator-predictionio git commit: Add support for Elasticsearch 5.x

Add support for Elasticsearch 5.x


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/36b79d7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/36b79d7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/36b79d7d

Branch: refs/heads/feature/es5
Commit: 36b79d7d029d086d14b74c675308fca948c6aa85
Parents: 09d04ed
Author: takahiro-hagino <ta...@bizreach.co.jp>
Authored: Mon Jan 16 11:50:48 2017 +0900
Committer: takahiro-hagino <ta...@bizreach.co.jp>
Committed: Mon Jan 16 13:41:44 2017 +0900

----------------------------------------------------------------------
 bin/install.sh                                  |   4 +-
 bin/pio-start-all                               |  94 ++++---
 bin/pio-stop-all                                |  50 ++--
 build.sbt                                       |   4 +-
 conf/pio-env.sh.template                        |   8 +-
 core/build.sbt                                  |  40 +--
 .../predictionio/workflow/CreateWorkflow.scala  |  74 ++---
 data/build.sbt                                  |  49 ++--
 .../storage/elasticsearch/ESAccessKeys.scala    | 140 ++++++----
 .../data/storage/elasticsearch/ESApps.scala     | 162 +++++++----
 .../data/storage/elasticsearch/ESChannels.scala | 132 +++++----
 .../elasticsearch/ESEngineInstances.scala       | 233 ++++++++++------
 .../elasticsearch/ESEngineManifests.scala       | 111 +++++---
 .../elasticsearch/ESEvaluationInstances.scala   | 176 +++++++-----
 .../storage/elasticsearch/ESEventsUtil.scala    | 125 +++++++++
 .../data/storage/elasticsearch/ESLEvents.scala  | 269 +++++++++++++++++++
 .../data/storage/elasticsearch/ESPEvents.scala  | 151 +++++++++++
 .../storage/elasticsearch/ESSequences.scala     |  69 ++---
 .../data/storage/elasticsearch/ESUtils.scala    | 159 +++++++++--
 .../storage/elasticsearch/StorageClient.scala   |  24 +-
 tools/build.sbt                                 |   4 +-
 21 files changed, 1518 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/install.sh
----------------------------------------------------------------------
diff --git a/bin/install.sh b/bin/install.sh
index f17cde5..e4fe220 100755
--- a/bin/install.sh
+++ b/bin/install.sh
@@ -19,9 +19,9 @@
 
 OS=`uname`
 PIO_VERSION=0.11.0-SNAPSHOT
-SPARK_VERSION=1.6.2
+SPARK_VERSION=1.6.3
 # Looks like support for Elasticsearch 2.0 will require 2.0 so deferring
-ELASTICSEARCH_VERSION=1.7.5
+ELASTICSEARCH_VERSION=5.1.2
 HBASE_VERSION=1.2.2
 POSTGRES_VERSION=9.4-1204.jdbc41
 MYSQL_VERSION=5.1.37

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/pio-start-all
----------------------------------------------------------------------
diff --git a/bin/pio-start-all b/bin/pio-start-all
index a78b0d2..03e10ae 100755
--- a/bin/pio-start-all
+++ b/bin/pio-start-all
@@ -25,63 +25,73 @@ export PIO_HOME="$(cd `dirname $0`/..; pwd)"
 
 . ${PIO_HOME}/bin/load-pio-env.sh
 
+SOURCE_TYPE=$PIO_STORAGE_REPOSITORIES_METADATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE
+
 # Elasticsearch
-echo "Starting Elasticsearch..."
-if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then
-  if [ -n "$JAVA_HOME" ]; then
-    JPS=`$JAVA_HOME/bin/jps`
+if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
+  echo "Starting Elasticsearch..."
+  if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then
+    if [ -n "$JAVA_HOME" ]; then
+      JPS=`$JAVA_HOME/bin/jps`
+    else
+      JPS=`jps`
+    fi
+    if [[ ${JPS} =~ "Elasticsearch" ]]; then
+      echo -e "\033[0;31mElasticsearch is already running. Please use pio-stop-all to try stopping it first.\033[0m"
+      echo -e "\033[0;31mNote: If you started Elasticsearch manually, you will need to kill it manually.\033[0m"
+      echo -e "\033[0;31mAborting...\033[0m"
+      exit 1
+    else
+      $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
+    fi
   else
-    JPS=`jps`
-  fi
-  if [[ ${JPS} =~ "Elasticsearch" ]]; then
-    echo -e "\033[0;31mElasticsearch is already running. Please use pio-stop-all to try stopping it first.\033[0m"
-    echo -e "\033[0;31mNote: If you started Elasticsearch manually, you will need to kill it manually.\033[0m"
-    echo -e "\033[0;31mAborting...\033[0m"
+    echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m"
+    echo -e "\033[0;31mCannot start Elasticsearch. Aborting...\033[0m"
     exit 1
-  else
-    $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
   fi
-else
-  echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m"
-  echo -e "\033[0;31mCannot start Elasticsearch. Aborting...\033[0m"
-  exit 1
 fi
 
 # HBase
-echo "Starting HBase..."
-if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
-  $PIO_STORAGE_SOURCES_HBASE_HOME/bin/start-hbase.sh
-else
-  echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_HBASE_HOME in conf/pio-env.sh, or in your environment.\033[0m"
-  # Kill everything for cleanliness
-  echo -e "\033[0;31mCannot start HBase. Aborting...\033[0m"
-  sleep 3
-  ${PIO_HOME}/bin/pio-stop-all
-  exit 1
+if [ `echo $SOURCE_TYPE | grep -i hbase | wc -l` != 0 ] ; then
+  echo "Starting HBase..."
+  if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
+    $PIO_STORAGE_SOURCES_HBASE_HOME/bin/start-hbase.sh
+  else
+    echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_HBASE_HOME in conf/pio-env.sh, or in your environment.\033[0m"
+    # Kill everything for cleanliness
+    echo -e "\033[0;31mCannot start HBase. Aborting...\033[0m"
+    sleep 3
+    ${PIO_HOME}/bin/pio-stop-all
+    exit 1
+  fi
 fi
 
 #PGSQL
-pgsqlStatus="$(ps auxwww | grep postgres | wc -l)"
-if [[ "$pgsqlStatus" < 5 ]]; then
-  # Detect OS
-  OS=`uname`
-  if [[ "$OS" = "Darwin" ]]; then
-    pg_cmd=`which pg_ctl`
-    if [[ "$pg_cmd" != "" ]]; then
-      pg_ctl -D /usr/local/var/postgres -l /usr/local/var/postgres/server.log start
+if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then
+  pgsqlStatus="$(ps auxwww | grep postgres | wc -l)"
+  if [[ "$pgsqlStatus" < 5 ]]; then
+    # Detect OS
+    OS=`uname`
+    if [[ "$OS" = "Darwin" ]]; then
+      pg_cmd=`which pg_ctl`
+      if [[ "$pg_cmd" != "" ]]; then
+        pg_ctl -D /usr/local/var/postgres -l /usr/local/var/postgres/server.log start
+      fi
+    elif [[ "$OS" = "Linux" ]]; then
+      sudo service postgresql start
+    else
+      echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
+      echo -e "\033[1;31mPlease do a manual startup!\033[0m"
+      ${PIO_HOME}/bin/pio-stop-all
+      exit 1
     fi
-  elif [[ "$OS" = "Linux" ]]; then
-    sudo service postgresql start
-  else
-    echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
-    echo -e "\033[1;31mPlease do a manual startup!\033[0m"
-    ${PIO_HOME}/bin/pio-stop-all
-    exit 1
   fi
 fi
 
 # PredictionIO Event Server
-echo "Waiting 10 seconds for HBase to fully initialize..."
+echo "Waiting 10 seconds for HBase/Elasticsearch to fully initialize..."
 sleep 10
 echo "Starting PredictionIO Event Server..."
 ${PIO_HOME}/bin/pio-daemon ${PIO_HOME}/eventserver.pid eventserver --ip 0.0.0.0

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/pio-stop-all
----------------------------------------------------------------------
diff --git a/bin/pio-stop-all b/bin/pio-stop-all
index 4aab5a3..dabad5d 100755
--- a/bin/pio-stop-all
+++ b/bin/pio-stop-all
@@ -25,6 +25,10 @@ export PIO_HOME="$(cd `dirname $0`/..; pwd)"
 
 . ${PIO_HOME}/bin/load-pio-env.sh
 
+SOURCE_TYPE=$PIO_STORAGE_REPOSITORIES_METADATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE
+
 # PredictionIO Event Server
 echo "Stopping PredictionIO Event Server..."
 PIDFILE=${PIO_HOME}/eventserver.pid
@@ -34,30 +38,38 @@ if [ -e ${PIDFILE} ]; then
 fi
 
 # HBase
-echo "Stopping HBase..."
-if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
-  $PIO_STORAGE_SOURCES_HBASE_HOME/bin/stop-hbase.sh
+if [ `echo $SOURCE_TYPE | grep -i hbase | wc -l` != 0 ] ; then
+  echo "Stopping HBase..."
+  if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
+    $PIO_STORAGE_SOURCES_HBASE_HOME/bin/stop-hbase.sh
+  fi
 fi
 
 # Elasticsearch
-echo "Stopping Elasticsearch..."
-PIDFILE=${PIO_HOME}/es.pid
-if [ -e ${PIDFILE} ]; then
-  cat ${PIDFILE} | xargs kill
-  rm ${PIDFILE}
+if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
+  echo "Stopping Elasticsearch..."
+  PIDFILE=${PIO_HOME}/es.pid
+  if [ -e ${PIDFILE} ]; then
+    cat ${PIDFILE} | xargs kill
+    rm ${PIDFILE}
+  fi
 fi
 
 #PGSQL
-OS=`uname`
-if [[ "$OS" = "Darwin" ]]; then
-  pg_cmd=`which pg_ctl`
-  if [[ "$pg_cmd" != "" ]]; then
-    pg_ctl -D /usr/local/var/postgres stop -s -m fast
+if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then
+  if [ -n "$PIO_STORAGE_SOURCES_PGSQL_TYPE" ]; then
+    OS=`uname`
+    if [[ "$OS" = "Darwin" ]]; then
+      pg_cmd=`which pg_ctl`
+      if [[ "$pg_cmd" != "" ]]; then
+        pg_ctl -D /usr/local/var/postgres stop -s -m fast
+      fi
+    elif [[ "$OS" = "Linux" ]]; then
+      sudo service postgresql stop
+    else
+      echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
+      echo -e "\033[1;31mPlease do a manual shutdown!\033[0m"
+      exit 1
+    fi
   fi
-elif [[ "$OS" = "Linux" ]]; then
-  sudo service postgresql stop
-else
-  echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
-  echo -e "\033[1;31mPlease do a manual shutdown!\033[0m"
-  exit 1
 fi

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 21c31c6..4d6a5b6 100644
--- a/build.sbt
+++ b/build.sbt
@@ -34,11 +34,11 @@ fork in (ThisBuild, run) := true
 javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7",
   "-Xlint:deprecation", "-Xlint:unchecked")
 
-elasticsearchVersion in ThisBuild := "1.4.4"
+elasticsearchVersion in ThisBuild := "5.1.2"
 
 json4sVersion in ThisBuild := "3.2.10"
 
-sparkVersion in ThisBuild := "1.4.0"
+sparkVersion in ThisBuild := "1.6.3"
 
 lazy val pioBuildInfoSettings = buildInfoSettings ++ Seq(
   sourceGenerators in Compile <+= buildInfo,

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index a06cd8e..a2841e3 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -24,7 +24,7 @@
 # you need to change these to fit your site.
 
 # SPARK_HOME: Apache Spark is a hard dependency and must be configured.
-SPARK_HOME=$PIO_HOME/vendors/spark-1.5.1-bin-hadoop2.6
+SPARK_HOME=$PIO_HOME/vendors/spark-1.6.3-bin-hadoop2.6
 
 POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-9.4-1204.jdbc41.jar
 MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.37.jar
@@ -85,10 +85,10 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
 
 # Elasticsearch Example
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name>
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.4.4
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
 
 # Local File System Example
 # PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index 637d4ea..abd8e07 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -18,27 +18,27 @@
 name := "apache-predictionio-core"
 
 libraryDependencies ++= Seq(
-  "com.github.scopt"       %% "scopt"            % "3.3.0",
-  "com.google.code.gson"    % "gson"             % "2.5",
-  "com.google.guava"        % "guava"            % "18.0",
-  "com.twitter"            %% "chill"            % "0.7.2"
+  "com.github.scopt"        %% "scopt"            % "3.3.0",
+  "com.google.code.gson"     % "gson"             % "2.5",
+  "com.google.guava"         % "guava"            % "18.0",
+  "com.twitter"             %% "chill"            % "0.7.2"
     exclude("com.esotericsoftware.minlog", "minlog"),
-  "com.twitter"            %% "chill-bijection"  % "0.7.2",
-  "de.javakaffee"           % "kryo-serializers" % "0.37",
-  "commons-io"              % "commons-io"       % "2.4",
-  "io.spray"               %% "spray-can"        % "1.3.3",
-  "io.spray"               %% "spray-routing"    % "1.3.3",
-  "net.jodah"               % "typetools"        % "0.3.1",
-  "org.apache.spark"       %% "spark-core"       % sparkVersion.value % "provided",
-  "org.apache.spark"       %% "spark-sql"        % sparkVersion.value % "provided",
-  "org.clapper"            %% "grizzled-slf4j"   % "1.0.2",
-  "org.elasticsearch"       % "elasticsearch"    % elasticsearchVersion.value,
-  "org.json4s"             %% "json4s-native"    % json4sVersion.value,
-  "org.json4s"             %% "json4s-ext"       % json4sVersion.value,
-  "org.scalaj"             %% "scalaj-http"      % "1.1.6",
-  "org.scalatest"          %% "scalatest"        % "2.1.7" % "test",
-  "org.slf4j"               % "slf4j-log4j12"    % "1.7.18",
-  "org.specs2"             %% "specs2"           % "2.3.13" % "test")
+  "com.twitter"             %% "chill-bijection"  % "0.7.2",
+  "de.javakaffee"            % "kryo-serializers" % "0.37",
+  "commons-io"               % "commons-io"       % "2.4",
+  "io.spray"                %% "spray-can"        % "1.3.3",
+  "io.spray"                %% "spray-routing"    % "1.3.3",
+  "net.jodah"                % "typetools"        % "0.3.1",
+  "org.apache.spark"        %% "spark-core"       % sparkVersion.value % "provided",
+  "org.apache.spark"        %% "spark-sql"        % sparkVersion.value % "provided",
+  "org.clapper"             %% "grizzled-slf4j"   % "1.0.2",
+  "org.elasticsearch.client" % "rest"             % elasticsearchVersion.value,
+  "org.json4s"              %% "json4s-native"    % json4sVersion.value,
+  "org.json4s"              %% "json4s-ext"       % json4sVersion.value,
+  "org.scalaj"              %% "scalaj-http"      % "1.1.6",
+  "org.scalatest"           %% "scalatest"        % "2.1.7" % "test",
+  "org.slf4j"                % "slf4j-log4j12"    % "1.7.18",
+  "org.specs2"              %% "specs2"           % "2.3.13" % "test")
 
 //testOptions := Seq(Tests.Filter(s => Seq("Dev").exists(s.contains(_))))
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
index 899ace2..edfc1b6 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
@@ -223,36 +223,40 @@ object CreateWorkflow extends Logging {
         engineFactoryObj.engineParams(wfc.engineParamsKey)
       }
 
-      val engineInstance = EngineInstance(
-        id = "",
-        status = "INIT",
-        startTime = DateTime.now,
-        endTime = DateTime.now,
-        engineId = wfc.engineId,
-        engineVersion = wfc.engineVersion,
-        engineVariant = variantId,
-        engineFactory = engineFactory,
-        batch = wfc.batch,
-        env = pioEnvVars,
-        sparkConf = workflowParams.sparkEnv,
-        dataSourceParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
-        preparatorParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
-        algorithmsParams =
-          JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
-        servingParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
+      try {
+        val engineInstance = EngineInstance(
+          id = "",
+          status = "INIT",
+          startTime = DateTime.now,
+          endTime = DateTime.now,
+          engineId = wfc.engineId,
+          engineVersion = wfc.engineVersion,
+          engineVariant = variantId,
+          engineFactory = engineFactory,
+          batch = wfc.batch,
+          env = pioEnvVars,
+          sparkConf = workflowParams.sparkEnv,
+          dataSourceParams =
+            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
+          preparatorParams =
+            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
+          algorithmsParams =
+            JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
+          servingParams =
+            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
 
-      val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
-        engineInstance)
+        val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
+          engineInstance)
 
-      CoreWorkflow.runTrain(
-        env = pioEnvVars,
-        params = workflowParams,
-        engine = trainableEngine,
-        engineParams = engineParams,
-        engineInstance = engineInstance.copy(id = engineInstanceId))
+        CoreWorkflow.runTrain(
+          env = pioEnvVars,
+          params = workflowParams,
+          engine = trainableEngine,
+          engineParams = engineParams,
+          engineInstance = engineInstance.copy(id = engineInstanceId))
+      } finally {
+        Storage.getLEvents().close()
+      }
     } else {
       val workflowParams = WorkflowParams(
         verbose = wfc.verbosity,
@@ -267,11 +271,15 @@ object CreateWorkflow extends Logging {
         env = pioEnvVars,
         sparkConf = workflowParams.sparkEnv
       )
-      Workflow.runEvaluation(
-        evaluation = evaluation.get,
-        engineParamsGenerator = engineParamsGenerator.get,
-        evaluationInstance = evaluationInstance,
-        params = workflowParams)
+      try {
+        Workflow.runEvaluation(
+          evaluation = evaluation.get,
+          engineParamsGenerator = engineParamsGenerator.get,
+          evaluationInstance = evaluationInstance,
+          params = workflowParams)
+      } finally {
+        Storage.getLEvents().close()
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/build.sbt
----------------------------------------------------------------------
diff --git a/data/build.sbt b/data/build.sbt
index 4526c39..4ae9b42 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -18,41 +18,44 @@
 name := "apache-predictionio-data"
 
 libraryDependencies ++= Seq(
-  "com.github.nscala-time" %% "nscala-time"    % "2.6.0",
-  "commons-codec"           % "commons-codec"  % "1.9",
-  "io.spray"               %% "spray-can"      % "1.3.3",
-  "io.spray"               %% "spray-routing"  % "1.3.3",
-  "io.spray"               %% "spray-testkit"  % "1.3.3" % "test",
-  "mysql"                   % "mysql-connector-java" % "5.1.37" % "optional",
-  "org.apache.hadoop"       % "hadoop-common"  % "2.6.2"
+  "com.github.nscala-time"  %% "nscala-time"    % "2.6.0",
+  "commons-codec"            % "commons-codec"  % "1.9",
+  "io.spray"                %% "spray-can"      % "1.3.3",
+  "io.spray"                %% "spray-routing"  % "1.3.3",
+  "io.spray"                %% "spray-testkit"  % "1.3.3" % "test",
+  "mysql"                    % "mysql-connector-java" % "5.1.37" % "optional",
+  "org.apache.hadoop"        % "hadoop-common"  % "2.6.2"
     exclude("javax.servlet", "servlet-api"),
-  "org.apache.hbase"        % "hbase-common"   % "0.98.5-hadoop2",
-  "org.apache.hbase"        % "hbase-client"   % "0.98.5-hadoop2"
+  "org.apache.hbase"         % "hbase-common"   % "0.98.5-hadoop2",
+  "org.apache.hbase"         % "hbase-client"   % "0.98.5-hadoop2"
     exclude("org.apache.zookeeper", "zookeeper"),
   // added for Parallel storage interface
-  "org.apache.hbase"        % "hbase-server"   % "0.98.5-hadoop2"
+  "org.apache.hbase"         % "hbase-server"   % "0.98.5-hadoop2"
     exclude("org.apache.hbase", "hbase-client")
     exclude("org.apache.zookeeper", "zookeeper")
     exclude("javax.servlet", "servlet-api")
     exclude("org.mortbay.jetty", "servlet-api-2.5")
     exclude("org.mortbay.jetty", "jsp-api-2.1")
     exclude("org.mortbay.jetty", "jsp-2.1"),
-  "org.apache.zookeeper"    % "zookeeper"      % "3.4.7"
+  "org.apache.zookeeper"     % "zookeeper"      % "3.4.7"
     exclude("org.slf4j", "slf4j-api")
     exclude("org.slf4j", "slf4j-log4j12"),
-  "org.apache.spark"       %% "spark-core"     % sparkVersion.value % "provided",
-  "org.apache.spark"       %% "spark-sql"      % sparkVersion.value % "provided",
-  "org.clapper"            %% "grizzled-slf4j" % "1.0.2",
-  "org.elasticsearch"       % "elasticsearch"  % elasticsearchVersion.value,
-  "org.json4s"             %% "json4s-native"  % json4sVersion.value,
-  "org.json4s"             %% "json4s-ext"     % json4sVersion.value,
-  "org.postgresql"          % "postgresql"     % "9.4-1204-jdbc41",
-  "org.scalatest"          %% "scalatest"      % "2.1.7" % "test",
-  "org.scalikejdbc"        %% "scalikejdbc"    % "2.3.5",
-  "org.slf4j"               % "slf4j-log4j12"  % "1.7.18",
-  "org.spark-project.akka" %% "akka-actor"     % "2.3.4-spark",
-  "org.specs2"             %% "specs2"         % "2.3.13" % "test")
+  "org.apache.spark"        %% "spark-core"     % sparkVersion.value % "provided",
+  "org.apache.spark"        %% "spark-sql"      % sparkVersion.value % "provided",
+  "org.clapper"             %% "grizzled-slf4j" % "1.0.2",
+  "org.elasticsearch.client" % "rest"           % elasticsearchVersion.value,
+  "org.elasticsearch"        % "elasticsearch-spark-13_2.10" % elasticsearchVersion.value % "provided",
+  "org.elasticsearch"        % "elasticsearch-hadoop-mr" % elasticsearchVersion.value,
+  "org.json4s"              %% "json4s-native"  % json4sVersion.value,
+  "org.json4s"              %% "json4s-ext"     % json4sVersion.value,
+  "org.postgresql"           % "postgresql"     % "9.4-1204-jdbc41",
+  "org.scalatest"           %% "scalatest"      % "2.1.7" % "test",
+  "org.scalikejdbc"         %% "scalikejdbc"    % "2.3.5",
+  "org.slf4j"                % "slf4j-log4j12"  % "1.7.18",
+  "org.spark-project.akka"  %% "akka-actor"     % "2.3.4-spark",
+  "org.specs2"              %% "specs2"         % "2.3.13" % "test")
 
 parallelExecution in Test := false
 
 pomExtra := childrenPomExtra.value
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 077168a..2c69cf4 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -15,45 +15,41 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.AccessKey
 import org.apache.predictionio.data.storage.AccessKeys
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
 import org.json4s._
+import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-import scala.util.Random
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
 
 /** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
+class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String)
     extends AccessKeys with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "accesskeys"
 
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
+  ESUtils.createIndex(client, index)
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> 0)) ~
+      ("properties" ->
+        ("key" -> ("type" -> "keyword")) ~
+        ("events" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(accessKey: AccessKey): Option[String] = {
     val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
@@ -61,59 +57,99 @@ class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
     Some(key)
   }
 
-  def get(key: String): Option[AccessKey] = {
+  def get(id: String): Option[AccessKey] = {
     try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        key).get()
-      Some(read[AccessKey](response.getSourceAsString))
+      val response = client.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[AccessKey])
+        case _ =>
+          None
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/$key", e)
         None
-      case e: NullPointerException => None
     }
   }
 
   def getAll(): Seq[AccessKey] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[AccessKey](client, builder)
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[AccessKey]()
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def getByAppid(appid: Int): Seq[AccessKey] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("appid", appid))
-      ESUtils.getAll[AccessKey](client, builder)
+      val json =
+        ("query" ->
+          ("term" ->
+            ("appid" -> appid)))
+      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[AccessKey]()
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def update(accessKey: AccessKey): Unit = {
+    val id = accessKey.key
     try {
-      client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
+      val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
     }
   }
 
-  def delete(key: String): Unit = {
+  def delete(id: String): Unit = {
     try {
-      client.prepareDelete(index, estype, key).get
+      val response = client.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/id")
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/id", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 3781a4b..7a65379 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -15,116 +15,160 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.App
 import org.apache.predictionio.data.storage.Apps
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
 import org.json4s._
+import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
 /** Elasticsearch implementation of Items. */
-class ESApps(client: Client, config: StorageClientConfig, index: String)
-  extends Apps with Logging {
+class ESApps(client: RestClient, config: StorageClientConfig, index: String)
+    extends Apps with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "apps"
   private val seq = new ESSequences(client, config, index)
 
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
+  ESUtils.createIndex(client, index)
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> 0))~
+      ("properties" ->
+        ("id" -> ("type" -> "keyword")) ~
+        ("name" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(app: App): Option[Int] = {
     val id =
       if (app.id == 0) {
-        var roll = seq.genNext("apps")
-        while (!get(roll).isEmpty) roll = seq.genNext("apps")
+        var roll = seq.genNext(estype)
+        while (!get(roll).isEmpty) roll = seq.genNext(estype)
         roll
-      }
-      else app.id
-    val realapp = app.copy(id = id)
-    update(realapp)
+      } else app.id
+    update(app.copy(id = id))
     Some(id)
   }
 
   def get(id: Int): Option[App] = {
     try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        id.toString).get()
-      Some(read[App](response.getSourceAsString))
+      val response = client.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[App])
+        case _ =>
+          None
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
         None
-      case e: NullPointerException => None
     }
   }
 
   def getByName(name: String): Option[App] = {
     try {
-      val response = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("name", name)).get
-      val hits = response.getHits().hits()
-      if (hits.size > 0) {
-        Some(read[App](hits.head.getSourceAsString))
-      } else {
-        None
+      val json =
+        ("query" ->
+          ("term" ->
+            ("name" -> name)))
+      val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/_search",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "hits" \ "total").extract[Long] match {
+        case 0 => None
+        case _ =>
+          val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+          val result = (results.head \ "_source").extract[App]
+          Some(result)
       }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
         None
     }
   }
 
   def getAll(): Seq[App] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[App](client, builder)
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[App](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[App]()
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def update(app: App): Unit = {
+    val id = app.id.toString
     try {
-      val response = client.prepareIndex(index, estype, app.id.toString).
-        setSource(write(app)).get()
+      val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
     }
   }
 
   def delete(id: Int): Unit = {
     try {
-      client.prepareDelete(index, estype, id.toString).get
+      val response = client.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/id", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index 52697fd..c90d668 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -15,102 +15,134 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.Channel
 import org.apache.predictionio.data.storage.Channels
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders.termFilter
-import org.json4s.DefaultFormats
+import org.elasticsearch.client.RestClient
+import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-class ESChannels(client: Client, config: StorageClientConfig, index: String)
-    extends Channels with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
 
+class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
+    extends Channels with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "channels"
   private val seq = new ESSequences(client, config, index)
-  private val seqName = "channels"
 
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
+  ESUtils.createIndex(client, index)
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> 0))~
+      ("properties" ->
+        ("name" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(channel: Channel): Option[Int] = {
     val id =
       if (channel.id == 0) {
-        var roll = seq.genNext(seqName)
-        while (!get(roll).isEmpty) roll = seq.genNext(seqName)
+        var roll = seq.genNext(estype)
+        while (!get(roll).isEmpty) roll = seq.genNext(estype)
         roll
       } else channel.id
 
-    val realChannel = channel.copy(id = id)
-    if (update(realChannel)) Some(id) else None
+    if (update(channel.copy(id = id))) Some(id) else None
   }
 
   def get(id: Int): Option[Channel] = {
     try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        id.toString).get()
-      Some(read[Channel](response.getSourceAsString))
+      val response = client.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[Channel])
+        case _ =>
+          None
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
         None
-      case e: NullPointerException => None
     }
   }
 
   def getByAppid(appid: Int): Seq[Channel] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("appid", appid))
-      ESUtils.getAll[Channel](client, builder)
+      val json =
+        ("query" ->
+          ("term" ->
+            ("appid" -> appid)))
+      ESUtils.getAll[Channel](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[Channel]()
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def update(channel: Channel): Boolean = {
+    val id = channel.id.toString
     try {
-      val response = client.prepareIndex(index, estype, channel.id.toString).
-        setSource(write(channel)).get()
-      true
+      val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "created" => true
+        case "updated" => true
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+          false
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
         false
     }
   }
 
   def delete(id: Int): Unit = {
     try {
-      client.prepareDelete(index, estype, id.toString).get
+      val response = client.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 21690bf..08f87f3 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -15,144 +15,211 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.EngineInstance
 import org.apache.predictionio.data.storage.EngineInstanceSerializer
 import org.apache.predictionio.data.storage.EngineInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
+import org.elasticsearch.client.RestClient
 import org.json4s._
+import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
-  extends EngineInstances with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String)
+    extends EngineInstances with Logging {
   implicit val formats = DefaultFormats + new EngineInstanceSerializer
   private val estype = "engine_instances"
 
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineVersion" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineVariant" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineFactory" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("batch" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("dataSourceParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("preparatorParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("algorithmsParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("servingParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
+  ESUtils.createIndex(client, index)
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> 0))~
+      ("properties" ->
+        ("status" -> ("type" -> "keyword")) ~
+        ("startTime" -> ("type" -> "date")) ~
+        ("endTime" -> ("type" -> "date")) ~
+        ("engineId" -> ("type" -> "keyword")) ~
+        ("engineVersion" -> ("type" -> "keyword")) ~
+        ("engineVariant" -> ("type" -> "keyword")) ~
+        ("engineFactory" -> ("type" -> "keyword")) ~
+        ("batch" -> ("type" -> "keyword")) ~
+        ("dataSourceParams" -> ("type" -> "keyword")) ~
+        ("preparatorParams" -> ("type" -> "keyword")) ~
+        ("algorithmsParams" -> ("type" -> "keyword")) ~
+        ("servingParams" -> ("type" -> "keyword")) ~
+        ("status" -> ("type" -> "keyword"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(i: EngineInstance): String = {
+    val id = i.id match {
+      case x if x.isEmpty =>
+        @scala.annotation.tailrec
+        def generateId(newId: Option[String]): String = {
+          newId match {
+            case Some(x) => x
+            case _ => generateId(preInsert())
+          }
+        }
+        generateId(preInsert())
+      case x => x
+    }
+
+    update(i.copy(id = id))
+    id
+  }
+
+  def preInsert(): Option[String] = {
     try {
-      val response = client.prepareIndex(index, estype).
-        setSource(write(i)).get
-      response.getId
+      val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+          Some((jsonResponse \ "_id").extract[String])
+        case _ =>
+          error(s"[$result] Failed to create $index/$estype")
+          None
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        ""
+      case e: IOException =>
+        error(s"Failed to create $index/$estype", e)
+        None
     }
   }
 
   def get(id: String): Option[EngineInstance] = {
     try {
-      val response = client.prepareGet(index, estype, id).get
-      if (response.isExists) {
-        Some(read[EngineInstance](response.getSourceAsString))
-      } else {
-        None
+      val response = client.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[EngineInstance])
+        case _ =>
+          None
       }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
         None
     }
   }
 
   def getAll(): Seq[EngineInstance] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[EngineInstance](client, builder)
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def getCompleted(
-      engineId: String,
-      engineVersion: String,
-      engineVariant: String): Seq[EngineInstance] = {
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Seq[EngineInstance] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
-        andFilter(
-          termFilter("status", "COMPLETED"),
-          termFilter("engineId", engineId),
-          termFilter("engineVersion", engineVersion),
-          termFilter("engineVariant", engineVariant))).
-        addSort("startTime", SortOrder.DESC)
-      ESUtils.getAll[EngineInstance](client, builder)
+      val json =
+        ("query" ->
+          ("bool" ->
+            ("must" -> List(
+              ("term" ->
+                ("status" -> "COMPLETED")),
+              ("term" ->
+                ("engineId" -> engineId)),
+              ("term" ->
+                ("engineVersion" -> engineVersion)),
+              ("term" ->
+                ("engineVariant" -> engineVariant)))))) ~
+              ("sort" -> List(
+                ("startTime" ->
+                  ("order" -> "desc"))))
+      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def getLatestCompleted(
-      engineId: String,
-      engineVersion: String,
-      engineVariant: String): Option[EngineInstance] =
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Option[EngineInstance] =
     getCompleted(
       engineId,
       engineVersion,
       engineVariant).headOption
 
   def update(i: EngineInstance): Unit = {
+    val id = i.id
     try {
-      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException => error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
     }
   }
 
   def delete(id: String): Unit = {
     try {
-      val response = client.prepareDelete(index, estype, id).get
+      val response = client.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException => error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
index 65b6691..a965c71 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
@@ -15,59 +15,96 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifestSerializer
-import org.apache.predictionio.data.storage.StorageClientConfig
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifestSerializer
 import org.apache.predictionio.data.storage.EngineManifests
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
 import org.json4s._
-import org.json4s.native.Serialization.read
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
 
-class ESEngineManifests(client: Client, config: StorageClientConfig, index: String)
-  extends EngineManifests with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: String)
+    extends EngineManifests with Logging {
   implicit val formats = DefaultFormats + new EngineManifestSerializer
   private val estype = "engine_manifests"
-  private def esid(id: String, version: String) = s"$id $version"
+  private def esid(id: String, version: String) = s"$id-$version"
 
   def insert(engineManifest: EngineManifest): Unit = {
-    val json = write(engineManifest)
-    val response = client.prepareIndex(
-      index,
-      estype,
-      esid(engineManifest.id, engineManifest.version)).
-      setSource(json).execute().actionGet()
+    val id = esid(engineManifest.id, engineManifest.version)
+    try {
+      val entity = new NStringEntity(write(engineManifest), ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    }
   }
 
   def get(id: String, version: String): Option[EngineManifest] = {
+    val esId = esid(id, version)
     try {
-      val response = client.prepareGet(index, estype, esid(id, version)).
-        execute().actionGet()
-      if (response.isExists) {
-        Some(read[EngineManifest](response.getSourceAsString))
-      } else {
-        None
+      val response = client.performRequest(
+        "GET",
+        s"/$index/$estype/$esId",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[EngineManifest])
+        case _ =>
+          None
       }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$esId", e)
         None
     }
+
   }
 
   def getAll(): Seq[EngineManifest] = {
     try {
-      val builder = client.prepareSearch()
-      ESUtils.getAll[EngineManifest](client, builder)
+      val json =
+        ("query" ->
+          ("match_all" ->  List.empty))
+      ESUtils.getAll[EngineManifest](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
@@ -75,10 +112,22 @@ class ESEngineManifests(client: Client, config: StorageClientConfig, index: Stri
     insert(engineManifest)
 
   def delete(id: String, version: String): Unit = {
+    val esId = esid(id, version)
     try {
-      client.prepareDelete(index, estype, esid(id, version)).execute().actionGet()
+      val response = client.performRequest(
+        "DELETE",
+        s"/$index/$estype/$esId",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$esId")
+      }
     } catch {
-      case e: ElasticsearchException => error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$esId", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 85bf820..0e71f79 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -15,122 +15,160 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.EvaluationInstance
 import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
 import org.apache.predictionio.data.storage.EvaluationInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
 import org.json4s._
+import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
-  extends EvaluationInstances with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String)
+    extends EvaluationInstances with Logging {
   implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
   private val estype = "evaluation_instances"
+  private val seq = new ESSequences(client, config, index)
 
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("evaluationClass" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineParamsGeneratorClass" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("batch" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("evaluatorResults" ->
-            ("type" -> "string") ~ ("index" -> "no")) ~
-          ("evaluatorResultsHTML" ->
-            ("type" -> "string") ~ ("index" -> "no")) ~
-          ("evaluatorResultsJSON" ->
-            ("type" -> "string") ~ ("index" -> "no"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
+  ESUtils.createIndex(client, index)
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> 0))~
+      ("properties" ->
+        ("status" -> ("type" -> "keyword")) ~
+        ("startTime" -> ("type" -> "date")) ~
+        ("endTime" -> ("type" -> "date")) ~
+        ("evaluationClass" -> ("type" -> "keyword")) ~
+        ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+        ("batch" -> ("type" -> "keyword")) ~
+        ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
+        ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
+        ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def insert(i: EvaluationInstance): String = {
-    try {
-      val response = client.prepareIndex(index, estype).
-        setSource(write(i)).get
-      response.getId
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        ""
+    val id = i.id match {
+      case x if x.isEmpty =>
+        var roll = seq.genNext(estype).toString
+        while (!get(roll).isEmpty) roll = seq.genNext(estype).toString
+        roll
+      case x => x
     }
+
+    update(i.copy(id = id))
+    id
   }
 
   def get(id: String): Option[EvaluationInstance] = {
     try {
-      val response = client.prepareGet(index, estype, id).get
-      if (response.isExists) {
-        Some(read[EvaluationInstance](response.getSourceAsString))
-      } else {
-        None
+      val response = client.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[EvaluationInstance])
+        case _ =>
+          None
       }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
         None
     }
   }
 
   def getAll(): Seq[EvaluationInstance] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[EvaluationInstance](client, builder)
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def getCompleted(): Seq[EvaluationInstance] = {
     try {
-      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
-        termFilter("status", "EVALCOMPLETED")).
-        addSort("startTime", SortOrder.DESC)
-      ESUtils.getAll[EvaluationInstance](client, builder)
+      val json =
+        ("query" ->
+          ("term" ->
+            ("status" -> "EVALCOMPLETED"))) ~
+            ("sort" ->
+              ("startTime" ->
+                ("order" -> "desc")))
+      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
     }
   }
 
   def update(i: EvaluationInstance): Unit = {
+    val id = i.id
     try {
-      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException => error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
     }
   }
 
   def delete(id: String): Unit = {
     try {
-      client.prepareDelete(index, estype, id).get
+      val response = client.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
     } catch {
-      case e: ElasticsearchException => error(e.getMessage)
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
new file mode 100644
index 0000000..f2ab7c2
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.MapWritable
+import org.apache.hadoop.io.Text
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s._
+
+object ESEventsUtil {
+
+  implicit val formats = DefaultFormats
+
+  def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = {
+
+    def getStringCol(col: String): String = {
+      val r = result.get(new Text(col)).asInstanceOf[Text]
+      require(r != null,
+        s"Failed to get value for column ${col}. " +
+          s"StringBinary: ${r.getBytes()}.")
+
+      r.toString()
+    }
+
+    def getOptStringCol(col: String): Option[String] = {
+      val r = result.get(new Text(col))
+      if (r == null) {
+        None
+      } else {
+        Some(r.asInstanceOf[Text].toString())
+      }
+    }
+
+    val tmp = result
+      .get(new Text("properties")).asInstanceOf[MapWritable]
+      .get(new Text("fields")).asInstanceOf[MapWritable]
+      .get(new Text("rating"))
+
+    val rating =
+      if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable]
+      else if (tmp.isInstanceOf[LongWritable]) {
+        new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble)
+      }
+      else null
+
+    val properties: DataMap =
+      if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""")
+      else DataMap()
+
+
+    val eventId = Some(getStringCol("eventId"))
+    val event = getStringCol("event")
+    val entityType = getStringCol("entityType")
+    val entityId = getStringCol("entityId")
+    val targetEntityType = getOptStringCol("targetEntityType")
+    val targetEntityId = getOptStringCol("targetEntityId")
+    val prId = getOptStringCol("prId")
+    val eventTimeZone = getOptStringCol("eventTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val eventTime = new DateTime(
+      getStringCol("eventTime"), eventTimeZone)
+    val creationTimeZone = getOptStringCol("creationTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val creationTime: DateTime = new DateTime(
+      getStringCol("creationTime"), creationTimeZone)
+
+
+    Event(
+      eventId = eventId,
+      event = event,
+      entityType = entityType,
+      entityId = entityId,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      properties = properties,
+      eventTime = eventTime,
+      tags = Seq(),
+      prId = prId,
+      creationTime = creationTime
+    )
+  }
+
+  def eventToPut(event: Event, appId: Int): Seq[Map[String, Any]] = {
+    Seq(
+      Map(
+        "eventId" -> event.eventId,
+        "event" -> event.event,
+        "entityType" -> event.entityType,
+        "entityId" -> event.entityId,
+        "targetEntityType" -> event.targetEntityType,
+        "targetEntityId" -> event.targetEntityId,
+        "properties" -> event.properties,
+        "eventTime" -> event.eventTime,
+        "tags" -> event.tags,
+        "prId" -> event.prId,
+        "creationTime" -> event.creationTime
+      )
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
new file mode 100644
index 0000000..ef25204
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.joda.time.DateTime
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+import org.json4s.ext.JodaTimeSerializers
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
+    extends LEvents with Logging {
+  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+  private val seq = new ESSequences(client, config, index)
+  private val seqName = "events"
+
+  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
+    channelId.map { ch =>
+      s"${appId}_${ch}"
+    }.getOrElse {
+      s"${appId}"
+    }
+  }
+
+  override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+    val estype = getEsType(appId, channelId)
+    ESUtils.createIndex(client, index)
+    val json =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("name" -> ("type" -> "keyword")) ~
+          ("eventId" -> ("type" -> "keyword")) ~
+          ("event" -> ("type" -> "keyword")) ~
+          ("entityType" -> ("type" -> "keyword")) ~
+          ("entityId" -> ("type" -> "keyword")) ~
+          ("targetEntityType" -> ("type" -> "keyword")) ~
+          ("targetEntityId" -> ("type" -> "keyword")) ~
+          ("properties" ->
+            ("type" -> "nested") ~
+            ("properties" ->
+              ("fields" -> ("type" -> "nested") ~
+                ("properties" ->
+                  ("user" -> ("type" -> "long")) ~
+                  ("num" -> ("type" -> "long")))))) ~
+                  ("eventTime" -> ("type" -> "date")) ~
+                  ("tags" -> ("type" -> "keyword")) ~
+                  ("prId" -> ("type" -> "keyword")) ~
+                  ("creationTime" -> ("type" -> "date"))))
+    ESUtils.createMapping(client, index, estype, compact(render(json)))
+    true
+  }
+
+  override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
+    val estype = getEsType(appId, channelId)
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+      client.performRequest(
+        "POST",
+        s"/$index/$estype/_delete_by_query",
+        Map.empty[String, String].asJava,
+        entity).getStatusLine.getStatusCode match {
+          case 200 => true
+          case _ =>
+            error(s"Failed to remove $index/$estype")
+            false
+        }
+    } catch {
+      case e: Exception =>
+        error(s"Failed to remove $index/$estype", e)
+        false
+    }
+  }
+
+  override def close(): Unit = {
+    try client.close() catch {
+      case e: Exception =>
+        error("Failed to close client.", e)
+    }
+  }
+
+  override def futureInsert(
+    event: Event,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val id = event.eventId.getOrElse {
+        var roll = seq.genNext(seqName)
+        while (exists(estype, roll)) roll = seq.genNext(seqName)
+        roll.toString
+      }
+      val json = write(event.copy(eventId = Some(id)))
+      try {
+        val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
+        val response = client.performRequest(
+          "POST",
+          s"/$index/$estype/$id",
+          Map.empty[String, String].asJava,
+          entity)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        val result = (jsonResponse \ "result").extract[String]
+        result match {
+          case "created" => id
+          case "updated" => id
+          case _ =>
+            error(s"[$result] Failed to update $index/$estype/$id")
+            ""
+        }
+      } catch {
+        case e: IOException =>
+          error(s"Failed to update $index/$estype/$id: $json", e)
+          ""
+      }
+    }
+  }
+
+  private def exists(estype: String, id: Int): Boolean = {
+    try {
+      client.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+          case 200 => true
+          case _ => false
+        }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => false
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            false
+        }
+      case e: IOException =>
+        error(s"Failed to access to $index/$estype/$id", e)
+        false
+    }
+  }
+
+  override def futureGet(
+    eventId: String,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      try {
+        val json =
+          ("query" ->
+            ("term" ->
+              ("eventId" -> eventId)))
+        val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+        val response = client.performRequest(
+          "POST",
+          s"/$index/$estype/_search",
+          Map.empty[String, String].asJava,
+          entity)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        (jsonResponse \ "hits" \ "total").extract[Long] match {
+          case 0 => None
+          case _ =>
+            val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+            val result = (results.head \ "_source").extract[Event]
+            Some(result)
+        }
+      } catch {
+        case e: IOException =>
+          error("Failed to access to /$index/$estype/_search", e)
+          None
+      }
+    }
+  }
+
+  override def futureDelete(
+    eventId: String,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      try {
+        val json =
+          ("query" ->
+            ("term" ->
+              ("eventId" -> eventId)))
+        val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+        val response = client.performRequest(
+          "POST",
+          s"/$index/$estype/_delete_by_query",
+          Map.empty[String, String].asJava)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        val result = (jsonResponse \ "result").extract[String]
+        result match {
+          case "deleted" => true
+          case _ =>
+            error(s"[$result] Failed to update $index/$estype:$eventId")
+            false
+        }
+      } catch {
+        case e: IOException =>
+          error(s"Failed to update $index/$estype:$eventId", e)
+          false
+      }
+    }
+  }
+
+  override def futureFind(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    limit: Option[Int] = None,
+    reversed: Option[Boolean] = None)
+    (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      try {
+        val query = ESUtils.createEventQuery(
+          startTime, untilTime, entityType, entityId,
+          eventNames, targetEntityType, targetEntityId, None)
+        ESUtils.getAll[Event](client, index, estype, query).toIterator
+      } catch {
+        case e: IOException =>
+          error(e.getMessage)
+          Iterator[Event]()
+      }
+    }
+  }
+
+}