You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/02/11 22:00:12 UTC
[05/10] incubator-predictionio git commit: add elasticsearch 1.x
support
add elasticsearch 1.x support
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bad2f038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bad2f038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bad2f038
Branch: refs/heads/feature/es5
Commit: bad2f0384d6d892eb576a6c2267bf88612a8eb66
Parents: 48e18b5
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Wed Jan 18 16:29:15 2017 +0900
Committer: Shinsuke Sugaya <sh...@yahoo.co.jp>
Committed: Wed Jan 18 16:29:15 2017 +0900
----------------------------------------------------------------------
.travis.yml | 4 +-
bin/install.sh | 4 +-
bin/pio-start-all | 9 +-
build.sbt | 2 +
conf/pio-env.sh.template | 6 +
core/build.sbt | 1 +
data/build.sbt | 1 +
.../storage/elasticsearch1/ESAccessKeys.scala | 119 ++++++++++++++
.../data/storage/elasticsearch1/ESApps.scala | 130 +++++++++++++++
.../storage/elasticsearch1/ESChannels.scala | 117 ++++++++++++++
.../elasticsearch1/ESEngineInstances.scala | 158 +++++++++++++++++++
.../elasticsearch1/ESEngineManifests.scala | 84 ++++++++++
.../elasticsearch1/ESEvaluationInstances.scala | 136 ++++++++++++++++
.../storage/elasticsearch1/ESSequences.scala | 64 ++++++++
.../data/storage/elasticsearch1/ESUtils.scala | 48 ++++++
.../storage/elasticsearch1/StorageClient.scala | 50 ++++++
.../data/storage/elasticsearch1/package.scala | 25 +++
project/Build.scala | 3 +
tests/Dockerfile | 2 +
tests/docker-files/env-conf/pio-env.sh | 16 +-
tools/build.sbt | 1 -
21 files changed, 969 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 68dee42..4d62999 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -63,8 +63,8 @@ env:
matrix:
- BUILD_TYPE=Unit
- BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
- - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
- - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
+ - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
+ - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
before_install:
- unset SBT_OPTS JVM_OPTS
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/bin/install.sh
----------------------------------------------------------------------
diff --git a/bin/install.sh b/bin/install.sh
index e4fe220..2f7c162 100755
--- a/bin/install.sh
+++ b/bin/install.sh
@@ -21,6 +21,7 @@ OS=`uname`
PIO_VERSION=0.11.0-SNAPSHOT
SPARK_VERSION=1.6.3
# Looks like support for Elasticsearch 2.0 will require 2.0 so deferring
+#ELASTICSEARCH_VERSION=1.7.6
ELASTICSEARCH_VERSION=5.1.2
HBASE_VERSION=1.2.2
POSTGRES_VERSION=9.4-1204.jdbc41
@@ -352,7 +353,8 @@ installES() {
fi
if [[ ! -e elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz ]]; then
echo "Downloading Elasticsearch..."
- curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
+ #curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
+ curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
fi
tar zxf elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
rm -rf ${elasticsearch_dir}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/bin/pio-start-all
----------------------------------------------------------------------
diff --git a/bin/pio-start-all b/bin/pio-start-all
index 03e10ae..87130fa 100755
--- a/bin/pio-start-all
+++ b/bin/pio-start-all
@@ -33,6 +33,11 @@ SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE
if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
echo "Starting Elasticsearch..."
if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then
+ ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME
+ elif [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME" ]; then
+ ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME
+ fi
+ if [ -n "$ELASTICSEARCH_HOME" ]; then
if [ -n "$JAVA_HOME" ]; then
JPS=`$JAVA_HOME/bin/jps`
else
@@ -44,7 +49,7 @@ if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
echo -e "\033[0;31mAborting...\033[0m"
exit 1
else
- $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
+ $ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
fi
else
echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m"
@@ -91,7 +96,7 @@ if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then
fi
# PredictionIO Event Server
-echo "Waiting 10 seconds for HBase/Elasticsearch to fully initialize..."
+echo "Waiting 10 seconds for Storage Repositories to fully initialize..."
sleep 10
echo "Starting PredictionIO Event Server..."
${PIO_HOME}/bin/pio-daemon ${PIO_HOME}/eventserver.pid eventserver --ip 0.0.0.0
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 4d6a5b6..776b2ad 100644
--- a/build.sbt
+++ b/build.sbt
@@ -36,6 +36,8 @@ javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7",
elasticsearchVersion in ThisBuild := "5.1.2"
+elasticsearch1Version in ThisBuild := "1.7.6"
+
json4sVersion in ThisBuild := "3.2.10"
sparkVersion in ThisBuild := "1.6.3"
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index a2841e3..8f5d7b1 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -89,6 +89,12 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
+# Elasticsearch 1.x Example
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=<elasticsearch_cluster_name>
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300
+# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6
# Local File System Example
# PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index abd8e07..305075e 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -33,6 +33,7 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.elasticsearch.client" % "rest" % elasticsearchVersion.value,
+ "org.elasticsearch" % "elasticsearch" % elasticsearch1Version.value,
"org.json4s" %% "json4s-native" % json4sVersion.value,
"org.json4s" %% "json4s-ext" % json4sVersion.value,
"org.scalaj" %% "scalaj-http" % "1.1.6",
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/build.sbt
----------------------------------------------------------------------
diff --git a/data/build.sbt b/data/build.sbt
index 4ae9b42..75d3c09 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -44,6 +44,7 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.elasticsearch.client" % "rest" % elasticsearchVersion.value,
+ "org.elasticsearch" % "elasticsearch" % elasticsearch1Version.value,
"org.elasticsearch" % "elasticsearch-spark-13_2.10" % elasticsearchVersion.value % "provided",
"org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value,
"org.json4s" %% "json4s-native" % json4sVersion.value,
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
new file mode 100644
index 0000000..7f50488
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+import scala.util.Random
+
+/** Elasticsearch implementation of AccessKeys. */
+class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
+ extends AccessKeys with Logging {
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "accesskeys"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(accessKey: AccessKey): Option[String] = {
+ val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+ update(accessKey.copy(key = key))
+ Some(key)
+ }
+
+ def get(key: String): Option[AccessKey] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ key).get()
+ Some(read[AccessKey](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getAll(): Seq[AccessKey] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[AccessKey](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[AccessKey]()
+ }
+ }
+
+ def getByAppid(appid: Int): Seq[AccessKey] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("appid", appid))
+ ESUtils.getAll[AccessKey](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[AccessKey]()
+ }
+ }
+
+ def update(accessKey: AccessKey): Unit = {
+ try {
+ client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+ def delete(key: String): Unit = {
+ try {
+ client.prepareDelete(index, estype, key).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
new file mode 100644
index 0000000..af61e17
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+/** Elasticsearch implementation of Items. */
+class ESApps(client: Client, config: StorageClientConfig, index: String)
+ extends Apps with Logging {
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "apps"
+ private val seq = new ESSequences(client, config, index)
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(app: App): Option[Int] = {
+ val id =
+ if (app.id == 0) {
+ var roll = seq.genNext("apps")
+ while (!get(roll).isEmpty) roll = seq.genNext("apps")
+ roll
+ }
+ else app.id
+ val realapp = app.copy(id = id)
+ update(realapp)
+ Some(id)
+ }
+
+ def get(id: Int): Option[App] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ id.toString).get()
+ Some(read[App](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getByName(name: String): Option[App] = {
+ try {
+ val response = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("name", name)).get
+ val hits = response.getHits().hits()
+ if (hits.size > 0) {
+ Some(read[App](hits.head.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[App] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[App](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[App]()
+ }
+ }
+
+ def update(app: App): Unit = {
+ try {
+ val response = client.prepareIndex(index, estype, app.id.toString).
+ setSource(write(app)).get()
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+ def delete(id: Int): Unit = {
+ try {
+ client.prepareDelete(index, estype, id.toString).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
new file mode 100644
index 0000000..f955bee
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders.termFilter
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESChannels(client: Client, config: StorageClientConfig, index: String)
+ extends Channels with Logging {
+
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "channels"
+ private val seq = new ESSequences(client, config, index)
+ private val seqName = "channels"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(channel: Channel): Option[Int] = {
+ val id =
+ if (channel.id == 0) {
+ var roll = seq.genNext(seqName)
+ while (!get(roll).isEmpty) roll = seq.genNext(seqName)
+ roll
+ } else channel.id
+
+ val realChannel = channel.copy(id = id)
+ if (update(realChannel)) Some(id) else None
+ }
+
+ def get(id: Int): Option[Channel] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ id.toString).get()
+ Some(read[Channel](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getByAppid(appid: Int): Seq[Channel] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("appid", appid))
+ ESUtils.getAll[Channel](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[Channel]()
+ }
+ }
+
+ def update(channel: Channel): Boolean = {
+ try {
+ val response = client.prepareIndex(index, estype, channel.id.toString).
+ setSource(write(channel)).get()
+ true
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ false
+ }
+ }
+
+ def delete(id: Int): Unit = {
+ try {
+ client.prepareDelete(index, estype, id.toString).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
new file mode 100644
index 0000000..cc10ff0
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstanceSerializer
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
+ extends EngineInstances with Logging {
+ implicit val formats = DefaultFormats + new EngineInstanceSerializer
+ private val estype = "engine_instances"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineVersion" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineVariant" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineFactory" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("batch" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("dataSourceParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("preparatorParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("algorithmsParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("servingParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(i: EngineInstance): String = {
+ try {
+ val response = client.prepareIndex(index, estype).
+ setSource(write(i)).get
+ response.getId
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ ""
+ }
+ }
+
+ def get(id: String): Option[EngineInstance] = {
+ try {
+ val response = client.prepareGet(index, estype, id).get
+ if (response.isExists) {
+ Some(read[EngineInstance](response.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[EngineInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[EngineInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def getCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Seq[EngineInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+ andFilter(
+ termFilter("status", "COMPLETED"),
+ termFilter("engineId", engineId),
+ termFilter("engineVersion", engineVersion),
+ termFilter("engineVariant", engineVariant))).
+ addSort("startTime", SortOrder.DESC)
+ ESUtils.getAll[EngineInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def getLatestCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Option[EngineInstance] =
+ getCompleted(
+ engineId,
+ engineVersion,
+ engineVariant).headOption
+
+ def update(i: EngineInstance): Unit = {
+ try {
+ client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+
+ def delete(id: String): Unit = {
+ try {
+ val response = client.prepareDelete(index, estype, id).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
new file mode 100644
index 0000000..307b582
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifestSerializer
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifests
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.json4s._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEngineManifests(client: Client, config: StorageClientConfig, index: String)
+ extends EngineManifests with Logging {
+ implicit val formats = DefaultFormats + new EngineManifestSerializer
+ private val estype = "engine_manifests"
+ private def esid(id: String, version: String) = s"$id $version"
+
+ def insert(engineManifest: EngineManifest): Unit = {
+ val json = write(engineManifest)
+ val response = client.prepareIndex(
+ index,
+ estype,
+ esid(engineManifest.id, engineManifest.version)).
+ setSource(json).execute().actionGet()
+ }
+
+ def get(id: String, version: String): Option[EngineManifest] = {
+ try {
+ val response = client.prepareGet(index, estype, esid(id, version)).
+ execute().actionGet()
+ if (response.isExists) {
+ Some(read[EngineManifest](response.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[EngineManifest] = {
+ try {
+ val builder = client.prepareSearch()
+ ESUtils.getAll[EngineManifest](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit =
+ insert(engineManifest)
+
+ def delete(id: String, version: String): Unit = {
+ try {
+ client.prepareDelete(index, estype, esid(id, version)).execute().actionGet()
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
new file mode 100644
index 0000000..b8d7056
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
+ extends EvaluationInstances with Logging {
+ implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
+ private val estype = "evaluation_instances"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("evaluationClass" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineParamsGeneratorClass" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("batch" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("evaluatorResults" ->
+ ("type" -> "string") ~ ("index" -> "no")) ~
+ ("evaluatorResultsHTML" ->
+ ("type" -> "string") ~ ("index" -> "no")) ~
+ ("evaluatorResultsJSON" ->
+ ("type" -> "string") ~ ("index" -> "no"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(i: EvaluationInstance): String = {
+ try {
+ val response = client.prepareIndex(index, estype).
+ setSource(write(i)).get
+ response.getId
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ ""
+ }
+ }
+
+ def get(id: String): Option[EvaluationInstance] = {
+ try {
+ val response = client.prepareGet(index, estype, id).get
+ if (response.isExists) {
+ Some(read[EvaluationInstance](response.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[EvaluationInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[EvaluationInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def getCompleted(): Seq[EvaluationInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+ termFilter("status", "EVALCOMPLETED")).
+ addSort("startTime", SortOrder.DESC)
+ ESUtils.getAll[EvaluationInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def update(i: EvaluationInstance): Unit = {
+ try {
+ client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+
+ def delete(id: String): Unit = {
+ try {
+ client.prepareDelete(index, estype, id).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
new file mode 100644
index 0000000..80247ec
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging {
+ implicit val formats = DefaultFormats
+ private val estype = "sequences"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ // val settingsJson =
+ // ("number_of_shards" -> 1) ~
+ // ("auto_expand_replicas" -> "0-all")
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val mappingJson =
+ (estype ->
+ ("_source" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> 0)) ~
+ ("_type" -> ("index" -> "no")) ~
+ ("enabled" -> 0))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(mappingJson))).get
+ }
+
+ def genNext(name: String): Int = {
+ try {
+ val response = client.prepareIndex(index, estype, name).
+ setSource(compact(render("n" -> name))).get
+ response.getVersion().toInt
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ 0
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
new file mode 100644
index 0000000..5de2999
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import org.elasticsearch.action.search.SearchRequestBuilder
+import org.elasticsearch.client.Client
+import org.elasticsearch.common.unit.TimeValue
+import org.json4s.Formats
+import org.json4s.native.Serialization.read
+
+import scala.collection.mutable.ArrayBuffer
+
+object ESUtils {
+ val scrollLife = new TimeValue(60000)
+
+ def getAll[T : Manifest](
+ client: Client,
+ builder: SearchRequestBuilder)(
+ implicit formats: Formats): Seq[T] = {
+ val results = ArrayBuffer[T]()
+ var response = builder.setScroll(scrollLife).get
+ var hits = response.getHits().hits()
+ results ++= hits.map(h => read[T](h.getSourceAsString))
+ while (hits.size > 0) {
+ response = client.prepareSearchScroll(response.getScrollId).
+ setScroll(scrollLife).get
+ hits = response.getHits().hits()
+ results ++= hits.map(h => read[T](h.getSourceAsString))
+ }
+ results
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
new file mode 100644
index 0000000..6f6b1c9
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch1
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.transport.TransportClient
+import org.elasticsearch.common.settings.ImmutableSettings
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.transport.ConnectTransportException
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+ with Logging {
+ override val prefix = "ES"
+ val client = try {
+ val hosts = config.properties.get("HOSTS").
+ map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+ val ports = config.properties.get("PORTS").
+ map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
+ val settings = ImmutableSettings.settingsBuilder()
+ .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch"))
+ val transportClient = new TransportClient(settings)
+ (hosts zip ports) foreach { hp =>
+ transportClient.addTransportAddress(
+ new InetSocketTransportAddress(hp._1, hp._2))
+ }
+ transportClient
+ } catch {
+ case e: ConnectTransportException =>
+ throw new StorageClientException(e.getMessage, e)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
new file mode 100644
index 0000000..d6aa24a
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** Elasticsearch implementation of storage traits, supporting meta data only
+ *
+ * @group Implementation
+ */
+package object elasticsearch1 {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 885073a..a8d730b 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -22,6 +22,9 @@ object PIOBuild extends Build {
val elasticsearchVersion = SettingKey[String](
"elasticsearch-version",
"The version of Elasticsearch used for building.")
+ val elasticsearch1Version = SettingKey[String](
+ "elasticsearch1-version",
+ "The version of Elasticsearch 1.x used for building.")
val json4sVersion = SettingKey[String](
"json4s-version",
"The version of JSON4S used for building.")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tests/Dockerfile
----------------------------------------------------------------------
diff --git a/tests/Dockerfile b/tests/Dockerfile
index 1f87554..0fe7187 100644
--- a/tests/Dockerfile
+++ b/tests/Dockerfile
@@ -18,6 +18,7 @@
from ubuntu
ENV SPARK_VERSION 1.4.0
+#ENV ELASTICSEARCH_VERSION 1.4.4
ENV ELASTICSEARCH_VERSION 5.1.2
ENV HBASE_VERSION 1.0.0
@@ -48,6 +49,7 @@ RUN rm spark-${SPARK_VERSION}-bin-hadoop2.6.tgz
ENV SPARK_HOME /vendors/spark-${SPARK_VERSION}-bin-hadoop2.6
RUN echo "== Installing Elasticsearch =="
+#RUN wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
RUN wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
RUN tar zxvfC elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz /vendors
RUN rm elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tests/docker-files/env-conf/pio-env.sh
----------------------------------------------------------------------
diff --git a/tests/docker-files/env-conf/pio-env.sh b/tests/docker-files/env-conf/pio-env.sh
index 7ea2164..aa18ff8 100644
--- a/tests/docker-files/env-conf/pio-env.sh
+++ b/tests/docker-files/env-conf/pio-env.sh
@@ -84,11 +84,17 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
# PIO_STORAGE_SOURCES_MYSQL_PASSWORD=pio
# Elasticsearch Example
-PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
-PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
-PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
-PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
-PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$ELASTICSEARCH_HOME
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
+# Elasticsearch 1.x Example
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1
+#PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=pio
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300
+PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$ELASTICSEARCH_HOME
# Local File System Example
PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tools/build.sbt
----------------------------------------------------------------------
diff --git a/tools/build.sbt b/tools/build.sbt
index 4e2b266..108e2fb 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -42,7 +42,6 @@ dependencyOverrides += "org.slf4j" % "slf4j-log4j12" % "1.7.18"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat
- case PathList("org", "joda", "time", "base", "BaseDateTime.class") => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)