You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/02/11 22:00:12 UTC

[05/10] incubator-predictionio git commit: add elasticsearch 1.x support

add elasticsearch 1.x support


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bad2f038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bad2f038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bad2f038

Branch: refs/heads/feature/es5
Commit: bad2f0384d6d892eb576a6c2267bf88612a8eb66
Parents: 48e18b5
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Wed Jan 18 16:29:15 2017 +0900
Committer: Shinsuke Sugaya <sh...@yahoo.co.jp>
Committed: Wed Jan 18 16:29:15 2017 +0900

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 bin/install.sh                                  |   4 +-
 bin/pio-start-all                               |   9 +-
 build.sbt                                       |   2 +
 conf/pio-env.sh.template                        |   6 +
 core/build.sbt                                  |   1 +
 data/build.sbt                                  |   1 +
 .../storage/elasticsearch1/ESAccessKeys.scala   | 119 ++++++++++++++
 .../data/storage/elasticsearch1/ESApps.scala    | 130 +++++++++++++++
 .../storage/elasticsearch1/ESChannels.scala     | 117 ++++++++++++++
 .../elasticsearch1/ESEngineInstances.scala      | 158 +++++++++++++++++++
 .../elasticsearch1/ESEngineManifests.scala      |  84 ++++++++++
 .../elasticsearch1/ESEvaluationInstances.scala  | 136 ++++++++++++++++
 .../storage/elasticsearch1/ESSequences.scala    |  64 ++++++++
 .../data/storage/elasticsearch1/ESUtils.scala   |  48 ++++++
 .../storage/elasticsearch1/StorageClient.scala  |  50 ++++++
 .../data/storage/elasticsearch1/package.scala   |  25 +++
 project/Build.scala                             |   3 +
 tests/Dockerfile                                |   2 +
 tests/docker-files/env-conf/pio-env.sh          |  16 +-
 tools/build.sbt                                 |   1 -
 21 files changed, 969 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 68dee42..4d62999 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -63,8 +63,8 @@ env:
   matrix:
     - BUILD_TYPE=Unit
     - BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
-    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
+    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
+    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
 
 before_install:
   - unset SBT_OPTS JVM_OPTS

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/bin/install.sh
----------------------------------------------------------------------
diff --git a/bin/install.sh b/bin/install.sh
index e4fe220..2f7c162 100755
--- a/bin/install.sh
+++ b/bin/install.sh
@@ -21,6 +21,7 @@ OS=`uname`
 PIO_VERSION=0.11.0-SNAPSHOT
 SPARK_VERSION=1.6.3
 # Looks like support for Elasticsearch 2.0 will require 2.0 so deferring
+#ELASTICSEARCH_VERSION=1.7.6
 ELASTICSEARCH_VERSION=5.1.2
 HBASE_VERSION=1.2.2
 POSTGRES_VERSION=9.4-1204.jdbc41
@@ -352,7 +353,8 @@ installES() {
     fi
     if [[ ! -e elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz ]]; then
       echo "Downloading Elasticsearch..."
-      curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
+      #curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
+      curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
     fi
     tar zxf elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
     rm -rf ${elasticsearch_dir}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/bin/pio-start-all
----------------------------------------------------------------------
diff --git a/bin/pio-start-all b/bin/pio-start-all
index 03e10ae..87130fa 100755
--- a/bin/pio-start-all
+++ b/bin/pio-start-all
@@ -33,6 +33,11 @@ SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE
 if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
   echo "Starting Elasticsearch..."
   if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then
+    ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME
+  elif [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME" ]; then
+    ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME
+  fi
+  if [ -n "$ELASTICSEARCH_HOME" ]; then
     if [ -n "$JAVA_HOME" ]; then
       JPS=`$JAVA_HOME/bin/jps`
     else
@@ -44,7 +49,7 @@ if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
       echo -e "\033[0;31mAborting...\033[0m"
       exit 1
     else
-      $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
+      $ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
     fi
   else
     echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m"
@@ -91,7 +96,7 @@ if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then
 fi
 
 # PredictionIO Event Server
-echo "Waiting 10 seconds for HBase/Elasticsearch to fully initialize..."
+echo "Waiting 10 seconds for Storage Repositories to fully initialize..."
 sleep 10
 echo "Starting PredictionIO Event Server..."
 ${PIO_HOME}/bin/pio-daemon ${PIO_HOME}/eventserver.pid eventserver --ip 0.0.0.0

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 4d6a5b6..776b2ad 100644
--- a/build.sbt
+++ b/build.sbt
@@ -36,6 +36,8 @@ javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7",
 
 elasticsearchVersion in ThisBuild := "5.1.2"
 
+elasticsearch1Version in ThisBuild := "1.7.6"
+
 json4sVersion in ThisBuild := "3.2.10"
 
 sparkVersion in ThisBuild := "1.6.3"

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index a2841e3..8f5d7b1 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -89,6 +89,12 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
+# Elasticsearch 1.x Example
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=<elasticsearch_cluster_name>
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6
 
 # Local File System Example
 # PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index abd8e07..305075e 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -33,6 +33,7 @@ libraryDependencies ++= Seq(
   "org.apache.spark"        %% "spark-sql"        % sparkVersion.value % "provided",
   "org.clapper"             %% "grizzled-slf4j"   % "1.0.2",
   "org.elasticsearch.client" % "rest"             % elasticsearchVersion.value,
+  "org.elasticsearch"        % "elasticsearch"    % elasticsearch1Version.value,
   "org.json4s"              %% "json4s-native"    % json4sVersion.value,
   "org.json4s"              %% "json4s-ext"       % json4sVersion.value,
   "org.scalaj"              %% "scalaj-http"      % "1.1.6",

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/build.sbt
----------------------------------------------------------------------
diff --git a/data/build.sbt b/data/build.sbt
index 4ae9b42..75d3c09 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -44,6 +44,7 @@ libraryDependencies ++= Seq(
   "org.apache.spark"        %% "spark-sql"      % sparkVersion.value % "provided",
   "org.clapper"             %% "grizzled-slf4j" % "1.0.2",
   "org.elasticsearch.client" % "rest"           % elasticsearchVersion.value,
+  "org.elasticsearch"        % "elasticsearch"  % elasticsearch1Version.value,
   "org.elasticsearch"        % "elasticsearch-spark-13_2.10" % elasticsearchVersion.value % "provided",
   "org.elasticsearch"        % "elasticsearch-hadoop-mr" % elasticsearchVersion.value,
   "org.json4s"              %% "json4s-native"  % json4sVersion.value,

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
new file mode 100644
index 0000000..7f50488
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+import scala.util.Random
+
+/** Elasticsearch implementation of AccessKeys. */
+class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
+    extends AccessKeys with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "accesskeys"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(accessKey: AccessKey): Option[String] = {
+    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+    update(accessKey.copy(key = key))
+    Some(key)
+  }
+
+  def get(key: String): Option[AccessKey] = {
+    try {
+      val response = client.prepareGet(
+        index,
+        estype,
+        key).get()
+      Some(read[AccessKey](response.getSourceAsString))
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+      case e: NullPointerException => None
+    }
+  }
+
+  def getAll(): Seq[AccessKey] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[AccessKey](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[AccessKey]()
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[AccessKey] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("appid", appid))
+      ESUtils.getAll[AccessKey](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[AccessKey]()
+    }
+  }
+
+  def update(accessKey: AccessKey): Unit = {
+    try {
+      client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+
+  def delete(key: String): Unit = {
+    try {
+      client.prepareDelete(index, estype, key).get
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
new file mode 100644
index 0000000..af61e17
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+/** Elasticsearch implementation of Items. */
+class ESApps(client: Client, config: StorageClientConfig, index: String)
+  extends Apps with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "apps"
+  private val seq = new ESSequences(client, config, index)
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(app: App): Option[Int] = {
+    val id =
+      if (app.id == 0) {
+        var roll = seq.genNext("apps")
+        while (!get(roll).isEmpty) roll = seq.genNext("apps")
+        roll
+      }
+      else app.id
+    val realapp = app.copy(id = id)
+    update(realapp)
+    Some(id)
+  }
+
+  def get(id: Int): Option[App] = {
+    try {
+      val response = client.prepareGet(
+        index,
+        estype,
+        id.toString).get()
+      Some(read[App](response.getSourceAsString))
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+      case e: NullPointerException => None
+    }
+  }
+
+  def getByName(name: String): Option[App] = {
+    try {
+      val response = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("name", name)).get
+      val hits = response.getHits().hits()
+      if (hits.size > 0) {
+        Some(read[App](hits.head.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[App] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[App](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[App]()
+    }
+  }
+
+  def update(app: App): Unit = {
+    try {
+      val response = client.prepareIndex(index, estype, app.id.toString).
+        setSource(write(app)).get()
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    try {
+      client.prepareDelete(index, estype, id.toString).get
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
new file mode 100644
index 0000000..f955bee
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders.termFilter
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESChannels(client: Client, config: StorageClientConfig, index: String)
+    extends Channels with Logging {
+
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "channels"
+  private val seq = new ESSequences(client, config, index)
+  private val seqName = "channels"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(channel: Channel): Option[Int] = {
+    val id =
+      if (channel.id == 0) {
+        var roll = seq.genNext(seqName)
+        while (!get(roll).isEmpty) roll = seq.genNext(seqName)
+        roll
+      } else channel.id
+
+    val realChannel = channel.copy(id = id)
+    if (update(realChannel)) Some(id) else None
+  }
+
+  def get(id: Int): Option[Channel] = {
+    try {
+      val response = client.prepareGet(
+        index,
+        estype,
+        id.toString).get()
+      Some(read[Channel](response.getSourceAsString))
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+      case e: NullPointerException => None
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[Channel] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("appid", appid))
+      ESUtils.getAll[Channel](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[Channel]()
+    }
+  }
+
+  def update(channel: Channel): Boolean = {
+    try {
+      val response = client.prepareIndex(index, estype, channel.id.toString).
+        setSource(write(channel)).get()
+      true
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        false
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    try {
+      client.prepareDelete(index, estype, id.toString).get
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
new file mode 100644
index 0000000..cc10ff0
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstanceSerializer
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
+  extends EngineInstances with Logging {
+  implicit val formats = DefaultFormats + new EngineInstanceSerializer
+  private val estype = "engine_instances"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineVersion" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineVariant" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineFactory" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("batch" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("dataSourceParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("preparatorParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("algorithmsParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("servingParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(i: EngineInstance): String = {
+    try {
+      val response = client.prepareIndex(index, estype).
+        setSource(write(i)).get
+      response.getId
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        ""
+    }
+  }
+
+  def get(id: String): Option[EngineInstance] = {
+    try {
+      val response = client.prepareGet(index, estype, id).get
+      if (response.isExists) {
+        Some(read[EngineInstance](response.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[EngineInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[EngineInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def getCompleted(
+      engineId: String,
+      engineVersion: String,
+      engineVariant: String): Seq[EngineInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+        andFilter(
+          termFilter("status", "COMPLETED"),
+          termFilter("engineId", engineId),
+          termFilter("engineVersion", engineVersion),
+          termFilter("engineVariant", engineVariant))).
+        addSort("startTime", SortOrder.DESC)
+      ESUtils.getAll[EngineInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def getLatestCompleted(
+      engineId: String,
+      engineVersion: String,
+      engineVariant: String): Option[EngineInstance] =
+    getCompleted(
+      engineId,
+      engineVersion,
+      engineVariant).headOption
+
+  def update(i: EngineInstance): Unit = {
+    try {
+      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+
+  def delete(id: String): Unit = {
+    try {
+      val response = client.prepareDelete(index, estype, id).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
new file mode 100644
index 0000000..307b582
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifestSerializer
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifests
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.json4s._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEngineManifests(client: Client, config: StorageClientConfig, index: String)
+  extends EngineManifests with Logging {
+  implicit val formats = DefaultFormats + new EngineManifestSerializer
+  private val estype = "engine_manifests"
+  private def esid(id: String, version: String) = s"$id $version"
+
+  def insert(engineManifest: EngineManifest): Unit = {
+    val json = write(engineManifest)
+    val response = client.prepareIndex(
+      index,
+      estype,
+      esid(engineManifest.id, engineManifest.version)).
+      setSource(json).execute().actionGet()
+  }
+
+  def get(id: String, version: String): Option[EngineManifest] = {
+    try {
+      val response = client.prepareGet(index, estype, esid(id, version)).
+        execute().actionGet()
+      if (response.isExists) {
+        Some(read[EngineManifest](response.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[EngineManifest] = {
+    try {
+      val builder = client.prepareSearch()
+      ESUtils.getAll[EngineManifest](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit =
+    insert(engineManifest)
+
+  def delete(id: String, version: String): Unit = {
+    try {
+      client.prepareDelete(index, estype, esid(id, version)).execute().actionGet()
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
new file mode 100644
index 0000000..b8d7056
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
+  extends EvaluationInstances with Logging {
+  implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
+  private val estype = "evaluation_instances"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("evaluationClass" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineParamsGeneratorClass" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("batch" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("evaluatorResults" ->
+            ("type" -> "string") ~ ("index" -> "no")) ~
+          ("evaluatorResultsHTML" ->
+            ("type" -> "string") ~ ("index" -> "no")) ~
+          ("evaluatorResultsJSON" ->
+            ("type" -> "string") ~ ("index" -> "no"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(i: EvaluationInstance): String = {
+    try {
+      val response = client.prepareIndex(index, estype).
+        setSource(write(i)).get
+      response.getId
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        ""
+    }
+  }
+
+  def get(id: String): Option[EvaluationInstance] = {
+    try {
+      val response = client.prepareGet(index, estype, id).get
+      if (response.isExists) {
+        Some(read[EvaluationInstance](response.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[EvaluationInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[EvaluationInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def getCompleted(): Seq[EvaluationInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+        termFilter("status", "EVALCOMPLETED")).
+        addSort("startTime", SortOrder.DESC)
+      ESUtils.getAll[EvaluationInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def update(i: EvaluationInstance): Unit = {
+    try {
+      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+
+  def delete(id: String): Unit = {
+    try {
+      client.prepareDelete(index, estype, id).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
new file mode 100644
index 0000000..80247ec
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging {
+  implicit val formats = DefaultFormats
+  private val estype = "sequences"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    // val settingsJson =
+    //   ("number_of_shards" -> 1) ~
+    //   ("auto_expand_replicas" -> "0-all")
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val mappingJson =
+      (estype ->
+        ("_source" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> 0)) ~
+        ("_type" -> ("index" -> "no")) ~
+        ("enabled" -> 0))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(mappingJson))).get
+  }
+
+  def genNext(name: String): Int = {
+    try {
+      val response = client.prepareIndex(index, estype, name).
+        setSource(compact(render("n" -> name))).get
+      response.getVersion().toInt
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
new file mode 100644
index 0000000..5de2999
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import org.elasticsearch.action.search.SearchRequestBuilder
+import org.elasticsearch.client.Client
+import org.elasticsearch.common.unit.TimeValue
+import org.json4s.Formats
+import org.json4s.native.Serialization.read
+
+import scala.collection.mutable.ArrayBuffer
+
+object ESUtils {
+  val scrollLife = new TimeValue(60000)
+
+  def getAll[T : Manifest](
+      client: Client,
+      builder: SearchRequestBuilder)(
+      implicit formats: Formats): Seq[T] = {
+    val results = ArrayBuffer[T]()
+    var response = builder.setScroll(scrollLife).get
+    var hits = response.getHits().hits()
+    results ++= hits.map(h => read[T](h.getSourceAsString))
+    while (hits.size > 0) {
+      response = client.prepareSearchScroll(response.getScrollId).
+        setScroll(scrollLife).get
+      hits = response.getHits().hits()
+      results ++= hits.map(h => read[T](h.getSourceAsString))
+    }
+    results
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
new file mode 100644
index 0000000..6f6b1c9
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.transport.TransportClient
+import org.elasticsearch.common.settings.ImmutableSettings
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.transport.ConnectTransportException
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+    with Logging {
+  override val prefix = "ES"
+  val client = try {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
+    val settings = ImmutableSettings.settingsBuilder()
+      .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch"))
+    val transportClient = new TransportClient(settings)
+    (hosts zip ports) foreach { hp =>
+      transportClient.addTransportAddress(
+        new InetSocketTransportAddress(hp._1, hp._2))
+    }
+    transportClient
+  } catch {
+    case e: ConnectTransportException =>
+      throw new StorageClientException(e.getMessage, e)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
new file mode 100644
index 0000000..d6aa24a
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** Elasticsearch implementation of storage traits, supporting meta data only
+  *
+  * @group Implementation
+  */
+package object elasticsearch1 {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 885073a..a8d730b 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -22,6 +22,9 @@ object PIOBuild extends Build {
   val elasticsearchVersion = SettingKey[String](
     "elasticsearch-version",
     "The version of Elasticsearch used for building.")
+  val elasticsearch1Version = SettingKey[String](
+    "elasticsearch1-version",
+    "The version of Elasticsearch 1.x used for building.")
   val json4sVersion = SettingKey[String](
     "json4s-version",
     "The version of JSON4S used for building.")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tests/Dockerfile
----------------------------------------------------------------------
diff --git a/tests/Dockerfile b/tests/Dockerfile
index 1f87554..0fe7187 100644
--- a/tests/Dockerfile
+++ b/tests/Dockerfile
@@ -18,6 +18,7 @@
 from ubuntu
 
 ENV SPARK_VERSION 1.4.0
+#ENV ELASTICSEARCH_VERSION 1.4.4
 ENV ELASTICSEARCH_VERSION 5.1.2
 ENV HBASE_VERSION 1.0.0
 
@@ -48,6 +49,7 @@ RUN rm spark-${SPARK_VERSION}-bin-hadoop2.6.tgz
 ENV SPARK_HOME /vendors/spark-${SPARK_VERSION}-bin-hadoop2.6
 
 RUN echo "== Installing Elasticsearch =="
+#RUN wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
 RUN wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
 RUN tar zxvfC elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz /vendors
 RUN rm elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tests/docker-files/env-conf/pio-env.sh
----------------------------------------------------------------------
diff --git a/tests/docker-files/env-conf/pio-env.sh b/tests/docker-files/env-conf/pio-env.sh
index 7ea2164..aa18ff8 100644
--- a/tests/docker-files/env-conf/pio-env.sh
+++ b/tests/docker-files/env-conf/pio-env.sh
@@ -84,11 +84,17 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
 # PIO_STORAGE_SOURCES_MYSQL_PASSWORD=pio
 
 # Elasticsearch Example
-PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
-PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
-PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
-PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
-PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$ELASTICSEARCH_HOME
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
+# Elasticsearch 1.x Example
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1
+#PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=pio
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$ELASTICSEARCH_HOME
 
 # Local File System Example
 PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tools/build.sbt
----------------------------------------------------------------------
diff --git a/tools/build.sbt b/tools/build.sbt
index 4e2b266..108e2fb 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -42,7 +42,6 @@ dependencyOverrides +=   "org.slf4j" % "slf4j-log4j12" % "1.7.18"
 assemblyMergeStrategy in assembly := {
   case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
   case PathList("META-INF", "NOTICE.txt")  => MergeStrategy.concat
-  case PathList("org", "joda", "time", "base", "BaseDateTime.class") => MergeStrategy.first
   case x =>
     val oldStrategy = (assemblyMergeStrategy in assembly).value
     oldStrategy(x)