You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/02/11 22:00:09 UTC
[02/10] incubator-predictionio git commit: Add support for
Elasticsearch 5.x
Add support for Elasticsearch 5.x
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/36b79d7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/36b79d7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/36b79d7d
Branch: refs/heads/feature/es5
Commit: 36b79d7d029d086d14b74c675308fca948c6aa85
Parents: 09d04ed
Author: takahiro-hagino <ta...@bizreach.co.jp>
Authored: Mon Jan 16 11:50:48 2017 +0900
Committer: takahiro-hagino <ta...@bizreach.co.jp>
Committed: Mon Jan 16 13:41:44 2017 +0900
----------------------------------------------------------------------
bin/install.sh | 4 +-
bin/pio-start-all | 94 ++++---
bin/pio-stop-all | 50 ++--
build.sbt | 4 +-
conf/pio-env.sh.template | 8 +-
core/build.sbt | 40 +--
.../predictionio/workflow/CreateWorkflow.scala | 74 ++---
data/build.sbt | 49 ++--
.../storage/elasticsearch/ESAccessKeys.scala | 140 ++++++----
.../data/storage/elasticsearch/ESApps.scala | 162 +++++++----
.../data/storage/elasticsearch/ESChannels.scala | 132 +++++----
.../elasticsearch/ESEngineInstances.scala | 233 ++++++++++------
.../elasticsearch/ESEngineManifests.scala | 111 +++++---
.../elasticsearch/ESEvaluationInstances.scala | 176 +++++++-----
.../storage/elasticsearch/ESEventsUtil.scala | 125 +++++++++
.../data/storage/elasticsearch/ESLEvents.scala | 269 +++++++++++++++++++
.../data/storage/elasticsearch/ESPEvents.scala | 151 +++++++++++
.../storage/elasticsearch/ESSequences.scala | 69 ++---
.../data/storage/elasticsearch/ESUtils.scala | 159 +++++++++--
.../storage/elasticsearch/StorageClient.scala | 24 +-
tools/build.sbt | 4 +-
21 files changed, 1518 insertions(+), 560 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/install.sh
----------------------------------------------------------------------
diff --git a/bin/install.sh b/bin/install.sh
index f17cde5..e4fe220 100755
--- a/bin/install.sh
+++ b/bin/install.sh
@@ -19,9 +19,9 @@
OS=`uname`
PIO_VERSION=0.11.0-SNAPSHOT
-SPARK_VERSION=1.6.2
+SPARK_VERSION=1.6.3
# Looks like support for Elasticsearch 2.0 will require 2.0 so deferring
-ELASTICSEARCH_VERSION=1.7.5
+ELASTICSEARCH_VERSION=5.1.2
HBASE_VERSION=1.2.2
POSTGRES_VERSION=9.4-1204.jdbc41
MYSQL_VERSION=5.1.37
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/pio-start-all
----------------------------------------------------------------------
diff --git a/bin/pio-start-all b/bin/pio-start-all
index a78b0d2..03e10ae 100755
--- a/bin/pio-start-all
+++ b/bin/pio-start-all
@@ -25,63 +25,73 @@ export PIO_HOME="$(cd `dirname $0`/..; pwd)"
. ${PIO_HOME}/bin/load-pio-env.sh
+SOURCE_TYPE=$PIO_STORAGE_REPOSITORIES_METADATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE
+
# Elasticsearch
-echo "Starting Elasticsearch..."
-if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then
- if [ -n "$JAVA_HOME" ]; then
- JPS=`$JAVA_HOME/bin/jps`
+if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
+ echo "Starting Elasticsearch..."
+ if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then
+ if [ -n "$JAVA_HOME" ]; then
+ JPS=`$JAVA_HOME/bin/jps`
+ else
+ JPS=`jps`
+ fi
+ if [[ ${JPS} =~ "Elasticsearch" ]]; then
+ echo -e "\033[0;31mElasticsearch is already running. Please use pio-stop-all to try stopping it first.\033[0m"
+ echo -e "\033[0;31mNote: If you started Elasticsearch manually, you will need to kill it manually.\033[0m"
+ echo -e "\033[0;31mAborting...\033[0m"
+ exit 1
+ else
+ $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
+ fi
else
- JPS=`jps`
- fi
- if [[ ${JPS} =~ "Elasticsearch" ]]; then
- echo -e "\033[0;31mElasticsearch is already running. Please use pio-stop-all to try stopping it first.\033[0m"
- echo -e "\033[0;31mNote: If you started Elasticsearch manually, you will need to kill it manually.\033[0m"
- echo -e "\033[0;31mAborting...\033[0m"
+ echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m"
+ echo -e "\033[0;31mCannot start Elasticsearch. Aborting...\033[0m"
exit 1
- else
- $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid
fi
-else
- echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m"
- echo -e "\033[0;31mCannot start Elasticsearch. Aborting...\033[0m"
- exit 1
fi
# HBase
-echo "Starting HBase..."
-if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
- $PIO_STORAGE_SOURCES_HBASE_HOME/bin/start-hbase.sh
-else
- echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_HBASE_HOME in conf/pio-env.sh, or in your environment.\033[0m"
- # Kill everything for cleanliness
- echo -e "\033[0;31mCannot start HBase. Aborting...\033[0m"
- sleep 3
- ${PIO_HOME}/bin/pio-stop-all
- exit 1
+if [ `echo $SOURCE_TYPE | grep -i hbase | wc -l` != 0 ] ; then
+ echo "Starting HBase..."
+ if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
+ $PIO_STORAGE_SOURCES_HBASE_HOME/bin/start-hbase.sh
+ else
+ echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_HBASE_HOME in conf/pio-env.sh, or in your environment.\033[0m"
+ # Kill everything for cleanliness
+ echo -e "\033[0;31mCannot start HBase. Aborting...\033[0m"
+ sleep 3
+ ${PIO_HOME}/bin/pio-stop-all
+ exit 1
+ fi
fi
#PGSQL
-pgsqlStatus="$(ps auxwww | grep postgres | wc -l)"
-if [[ "$pgsqlStatus" < 5 ]]; then
- # Detect OS
- OS=`uname`
- if [[ "$OS" = "Darwin" ]]; then
- pg_cmd=`which pg_ctl`
- if [[ "$pg_cmd" != "" ]]; then
- pg_ctl -D /usr/local/var/postgres -l /usr/local/var/postgres/server.log start
+if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then
+ pgsqlStatus="$(ps auxwww | grep postgres | wc -l)"
+ if [[ "$pgsqlStatus" < 5 ]]; then
+ # Detect OS
+ OS=`uname`
+ if [[ "$OS" = "Darwin" ]]; then
+ pg_cmd=`which pg_ctl`
+ if [[ "$pg_cmd" != "" ]]; then
+ pg_ctl -D /usr/local/var/postgres -l /usr/local/var/postgres/server.log start
+ fi
+ elif [[ "$OS" = "Linux" ]]; then
+ sudo service postgresql start
+ else
+ echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
+ echo -e "\033[1;31mPlease do a manual startup!\033[0m"
+ ${PIO_HOME}/bin/pio-stop-all
+ exit 1
fi
- elif [[ "$OS" = "Linux" ]]; then
- sudo service postgresql start
- else
- echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
- echo -e "\033[1;31mPlease do a manual startup!\033[0m"
- ${PIO_HOME}/bin/pio-stop-all
- exit 1
fi
fi
# PredictionIO Event Server
-echo "Waiting 10 seconds for HBase to fully initialize..."
+echo "Waiting 10 seconds for HBase/Elasticsearch to fully initialize..."
sleep 10
echo "Starting PredictionIO Event Server..."
${PIO_HOME}/bin/pio-daemon ${PIO_HOME}/eventserver.pid eventserver --ip 0.0.0.0
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/pio-stop-all
----------------------------------------------------------------------
diff --git a/bin/pio-stop-all b/bin/pio-stop-all
index 4aab5a3..dabad5d 100755
--- a/bin/pio-stop-all
+++ b/bin/pio-stop-all
@@ -25,6 +25,10 @@ export PIO_HOME="$(cd `dirname $0`/..; pwd)"
. ${PIO_HOME}/bin/load-pio-env.sh
+SOURCE_TYPE=$PIO_STORAGE_REPOSITORIES_METADATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE
+SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE
+
# PredictionIO Event Server
echo "Stopping PredictionIO Event Server..."
PIDFILE=${PIO_HOME}/eventserver.pid
@@ -34,30 +38,38 @@ if [ -e ${PIDFILE} ]; then
fi
# HBase
-echo "Stopping HBase..."
-if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
- $PIO_STORAGE_SOURCES_HBASE_HOME/bin/stop-hbase.sh
+if [ `echo $SOURCE_TYPE | grep -i hbase | wc -l` != 0 ] ; then
+ echo "Stopping HBase..."
+ if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then
+ $PIO_STORAGE_SOURCES_HBASE_HOME/bin/stop-hbase.sh
+ fi
fi
# Elasticsearch
-echo "Stopping Elasticsearch..."
-PIDFILE=${PIO_HOME}/es.pid
-if [ -e ${PIDFILE} ]; then
- cat ${PIDFILE} | xargs kill
- rm ${PIDFILE}
+if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then
+ echo "Stopping Elasticsearch..."
+ PIDFILE=${PIO_HOME}/es.pid
+ if [ -e ${PIDFILE} ]; then
+ cat ${PIDFILE} | xargs kill
+ rm ${PIDFILE}
+ fi
fi
#PGSQL
-OS=`uname`
-if [[ "$OS" = "Darwin" ]]; then
- pg_cmd=`which pg_ctl`
- if [[ "$pg_cmd" != "" ]]; then
- pg_ctl -D /usr/local/var/postgres stop -s -m fast
+if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then
+ if [ -n "$PIO_STORAGE_SOURCES_PGSQL_TYPE" ]; then
+ OS=`uname`
+ if [[ "$OS" = "Darwin" ]]; then
+ pg_cmd=`which pg_ctl`
+ if [[ "$pg_cmd" != "" ]]; then
+ pg_ctl -D /usr/local/var/postgres stop -s -m fast
+ fi
+ elif [[ "$OS" = "Linux" ]]; then
+ sudo service postgresql stop
+ else
+ echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
+ echo -e "\033[1;31mPlease do a manual shutdown!\033[0m"
+ exit 1
+ fi
fi
-elif [[ "$OS" = "Linux" ]]; then
- sudo service postgresql stop
-else
- echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m"
- echo -e "\033[1;31mPlease do a manual shutdown!\033[0m"
- exit 1
fi
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 21c31c6..4d6a5b6 100644
--- a/build.sbt
+++ b/build.sbt
@@ -34,11 +34,11 @@ fork in (ThisBuild, run) := true
javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7",
"-Xlint:deprecation", "-Xlint:unchecked")
-elasticsearchVersion in ThisBuild := "1.4.4"
+elasticsearchVersion in ThisBuild := "5.1.2"
json4sVersion in ThisBuild := "3.2.10"
-sparkVersion in ThisBuild := "1.4.0"
+sparkVersion in ThisBuild := "1.6.3"
lazy val pioBuildInfoSettings = buildInfoSettings ++ Seq(
sourceGenerators in Compile <+= buildInfo,
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index a06cd8e..a2841e3 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -24,7 +24,7 @@
# you need to change these to fit your site.
# SPARK_HOME: Apache Spark is a hard dependency and must be configured.
-SPARK_HOME=$PIO_HOME/vendors/spark-1.5.1-bin-hadoop2.6
+SPARK_HOME=$PIO_HOME/vendors/spark-1.6.3-bin-hadoop2.6
POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-9.4-1204.jdbc41.jar
MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.37.jar
@@ -85,10 +85,10 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
# Elasticsearch Example
# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name>
# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.4.4
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
# Local File System Example
# PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index 637d4ea..abd8e07 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -18,27 +18,27 @@
name := "apache-predictionio-core"
libraryDependencies ++= Seq(
- "com.github.scopt" %% "scopt" % "3.3.0",
- "com.google.code.gson" % "gson" % "2.5",
- "com.google.guava" % "guava" % "18.0",
- "com.twitter" %% "chill" % "0.7.2"
+ "com.github.scopt" %% "scopt" % "3.3.0",
+ "com.google.code.gson" % "gson" % "2.5",
+ "com.google.guava" % "guava" % "18.0",
+ "com.twitter" %% "chill" % "0.7.2"
exclude("com.esotericsoftware.minlog", "minlog"),
- "com.twitter" %% "chill-bijection" % "0.7.2",
- "de.javakaffee" % "kryo-serializers" % "0.37",
- "commons-io" % "commons-io" % "2.4",
- "io.spray" %% "spray-can" % "1.3.3",
- "io.spray" %% "spray-routing" % "1.3.3",
- "net.jodah" % "typetools" % "0.3.1",
- "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
- "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
- "org.clapper" %% "grizzled-slf4j" % "1.0.2",
- "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value,
- "org.json4s" %% "json4s-native" % json4sVersion.value,
- "org.json4s" %% "json4s-ext" % json4sVersion.value,
- "org.scalaj" %% "scalaj-http" % "1.1.6",
- "org.scalatest" %% "scalatest" % "2.1.7" % "test",
- "org.slf4j" % "slf4j-log4j12" % "1.7.18",
- "org.specs2" %% "specs2" % "2.3.13" % "test")
+ "com.twitter" %% "chill-bijection" % "0.7.2",
+ "de.javakaffee" % "kryo-serializers" % "0.37",
+ "commons-io" % "commons-io" % "2.4",
+ "io.spray" %% "spray-can" % "1.3.3",
+ "io.spray" %% "spray-routing" % "1.3.3",
+ "net.jodah" % "typetools" % "0.3.1",
+ "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
+ "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
+ "org.clapper" %% "grizzled-slf4j" % "1.0.2",
+ "org.elasticsearch.client" % "rest" % elasticsearchVersion.value,
+ "org.json4s" %% "json4s-native" % json4sVersion.value,
+ "org.json4s" %% "json4s-ext" % json4sVersion.value,
+ "org.scalaj" %% "scalaj-http" % "1.1.6",
+ "org.scalatest" %% "scalatest" % "2.1.7" % "test",
+ "org.slf4j" % "slf4j-log4j12" % "1.7.18",
+ "org.specs2" %% "specs2" % "2.3.13" % "test")
//testOptions := Seq(Tests.Filter(s => Seq("Dev").exists(s.contains(_))))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
index 899ace2..edfc1b6 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
@@ -223,36 +223,40 @@ object CreateWorkflow extends Logging {
engineFactoryObj.engineParams(wfc.engineParamsKey)
}
- val engineInstance = EngineInstance(
- id = "",
- status = "INIT",
- startTime = DateTime.now,
- endTime = DateTime.now,
- engineId = wfc.engineId,
- engineVersion = wfc.engineVersion,
- engineVariant = variantId,
- engineFactory = engineFactory,
- batch = wfc.batch,
- env = pioEnvVars,
- sparkConf = workflowParams.sparkEnv,
- dataSourceParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
- preparatorParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
- algorithmsParams =
- JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
- servingParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
+ try {
+ val engineInstance = EngineInstance(
+ id = "",
+ status = "INIT",
+ startTime = DateTime.now,
+ endTime = DateTime.now,
+ engineId = wfc.engineId,
+ engineVersion = wfc.engineVersion,
+ engineVariant = variantId,
+ engineFactory = engineFactory,
+ batch = wfc.batch,
+ env = pioEnvVars,
+ sparkConf = workflowParams.sparkEnv,
+ dataSourceParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
+ preparatorParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
+ algorithmsParams =
+ JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
+ servingParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
- val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
- engineInstance)
+ val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
+ engineInstance)
- CoreWorkflow.runTrain(
- env = pioEnvVars,
- params = workflowParams,
- engine = trainableEngine,
- engineParams = engineParams,
- engineInstance = engineInstance.copy(id = engineInstanceId))
+ CoreWorkflow.runTrain(
+ env = pioEnvVars,
+ params = workflowParams,
+ engine = trainableEngine,
+ engineParams = engineParams,
+ engineInstance = engineInstance.copy(id = engineInstanceId))
+ } finally {
+ Storage.getLEvents().close()
+ }
} else {
val workflowParams = WorkflowParams(
verbose = wfc.verbosity,
@@ -267,11 +271,15 @@ object CreateWorkflow extends Logging {
env = pioEnvVars,
sparkConf = workflowParams.sparkEnv
)
- Workflow.runEvaluation(
- evaluation = evaluation.get,
- engineParamsGenerator = engineParamsGenerator.get,
- evaluationInstance = evaluationInstance,
- params = workflowParams)
+ try {
+ Workflow.runEvaluation(
+ evaluation = evaluation.get,
+ engineParamsGenerator = engineParamsGenerator.get,
+ evaluationInstance = evaluationInstance,
+ params = workflowParams)
+ } finally {
+ Storage.getLEvents().close()
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/build.sbt
----------------------------------------------------------------------
diff --git a/data/build.sbt b/data/build.sbt
index 4526c39..4ae9b42 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -18,41 +18,44 @@
name := "apache-predictionio-data"
libraryDependencies ++= Seq(
- "com.github.nscala-time" %% "nscala-time" % "2.6.0",
- "commons-codec" % "commons-codec" % "1.9",
- "io.spray" %% "spray-can" % "1.3.3",
- "io.spray" %% "spray-routing" % "1.3.3",
- "io.spray" %% "spray-testkit" % "1.3.3" % "test",
- "mysql" % "mysql-connector-java" % "5.1.37" % "optional",
- "org.apache.hadoop" % "hadoop-common" % "2.6.2"
+ "com.github.nscala-time" %% "nscala-time" % "2.6.0",
+ "commons-codec" % "commons-codec" % "1.9",
+ "io.spray" %% "spray-can" % "1.3.3",
+ "io.spray" %% "spray-routing" % "1.3.3",
+ "io.spray" %% "spray-testkit" % "1.3.3" % "test",
+ "mysql" % "mysql-connector-java" % "5.1.37" % "optional",
+ "org.apache.hadoop" % "hadoop-common" % "2.6.2"
exclude("javax.servlet", "servlet-api"),
- "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2",
- "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2"
+ "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2",
+ "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2"
exclude("org.apache.zookeeper", "zookeeper"),
// added for Parallel storage interface
- "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2"
+ "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2"
exclude("org.apache.hbase", "hbase-client")
exclude("org.apache.zookeeper", "zookeeper")
exclude("javax.servlet", "servlet-api")
exclude("org.mortbay.jetty", "servlet-api-2.5")
exclude("org.mortbay.jetty", "jsp-api-2.1")
exclude("org.mortbay.jetty", "jsp-2.1"),
- "org.apache.zookeeper" % "zookeeper" % "3.4.7"
+ "org.apache.zookeeper" % "zookeeper" % "3.4.7"
exclude("org.slf4j", "slf4j-api")
exclude("org.slf4j", "slf4j-log4j12"),
- "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
- "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
- "org.clapper" %% "grizzled-slf4j" % "1.0.2",
- "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value,
- "org.json4s" %% "json4s-native" % json4sVersion.value,
- "org.json4s" %% "json4s-ext" % json4sVersion.value,
- "org.postgresql" % "postgresql" % "9.4-1204-jdbc41",
- "org.scalatest" %% "scalatest" % "2.1.7" % "test",
- "org.scalikejdbc" %% "scalikejdbc" % "2.3.5",
- "org.slf4j" % "slf4j-log4j12" % "1.7.18",
- "org.spark-project.akka" %% "akka-actor" % "2.3.4-spark",
- "org.specs2" %% "specs2" % "2.3.13" % "test")
+ "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
+ "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
+ "org.clapper" %% "grizzled-slf4j" % "1.0.2",
+ "org.elasticsearch.client" % "rest" % elasticsearchVersion.value,
+ "org.elasticsearch" % "elasticsearch-spark-13_2.10" % elasticsearchVersion.value % "provided",
+ "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value,
+ "org.json4s" %% "json4s-native" % json4sVersion.value,
+ "org.json4s" %% "json4s-ext" % json4sVersion.value,
+ "org.postgresql" % "postgresql" % "9.4-1204-jdbc41",
+ "org.scalatest" %% "scalatest" % "2.1.7" % "test",
+ "org.scalikejdbc" %% "scalikejdbc" % "2.3.5",
+ "org.slf4j" % "slf4j-log4j12" % "1.7.18",
+ "org.spark-project.akka" %% "akka-actor" % "2.3.4-spark",
+ "org.specs2" %% "specs2" % "2.3.13" % "test")
parallelExecution in Test := false
pomExtra := childrenPomExtra.value
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 077168a..2c69cf4 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -15,45 +15,41 @@
* limitations under the License.
*/
-
package org.apache.predictionio.data.storage.elasticsearch
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.AccessKey
import org.apache.predictionio.data.storage.AccessKeys
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
import org.json4s._
+import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
-import scala.util.Random
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
/** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
+class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String)
extends AccessKeys with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "accesskeys"
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
+ ESUtils.createIndex(client, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
+ ("properties" ->
+ ("key" -> ("type" -> "keyword")) ~
+ ("events" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(accessKey: AccessKey): Option[String] = {
val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
@@ -61,59 +57,99 @@ class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
Some(key)
}
- def get(key: String): Option[AccessKey] = {
+ def get(id: String): Option[AccessKey] = {
try {
- val response = client.prepareGet(
- index,
- estype,
- key).get()
- Some(read[AccessKey](response.getSourceAsString))
+ val response = client.performRequest(
+ "GET",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "found").extract[Boolean] match {
+ case true =>
+ Some((jsonResponse \ "_source").extract[AccessKey])
+ case _ =>
+ None
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => None
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ None
+ }
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/$key", e)
None
- case e: NullPointerException => None
}
}
def getAll(): Seq[AccessKey] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[AccessKey](client, builder)
+ val json =
+ ("query" ->
+ ("match_all" -> List.empty))
+ ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[AccessKey]()
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def getByAppid(appid: Int): Seq[AccessKey] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype).
- setPostFilter(termFilter("appid", appid))
- ESUtils.getAll[AccessKey](client, builder)
+ val json =
+ ("query" ->
+ ("term" ->
+ ("appid" -> appid)))
+ ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[AccessKey]()
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def update(accessKey: AccessKey): Unit = {
+ val id = accessKey.key
try {
- client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
+ val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "created" =>
+ case "updated" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
}
}
- def delete(key: String): Unit = {
+ def delete(id: String): Unit = {
try {
- client.prepareDelete(index, estype, key).get
+ val response = client.performRequest(
+ "DELETE",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val json = parse(EntityUtils.toString(response.getEntity))
+ val result = (json \ "result").extract[String]
+ result match {
+ case "deleted" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/id")
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/id", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 3781a4b..7a65379 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -15,116 +15,160 @@
* limitations under the License.
*/
-
package org.apache.predictionio.data.storage.elasticsearch
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.App
import org.apache.predictionio.data.storage.Apps
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
import org.json4s._
+import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
/** Elasticsearch implementation of Items. */
-class ESApps(client: Client, config: StorageClientConfig, index: String)
- extends Apps with Logging {
+class ESApps(client: RestClient, config: StorageClientConfig, index: String)
+ extends Apps with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "apps"
private val seq = new ESSequences(client, config, index)
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
+ ESUtils.createIndex(client, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0))~
+ ("properties" ->
+ ("id" -> ("type" -> "keyword")) ~
+ ("name" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(app: App): Option[Int] = {
val id =
if (app.id == 0) {
- var roll = seq.genNext("apps")
- while (!get(roll).isEmpty) roll = seq.genNext("apps")
+ var roll = seq.genNext(estype)
+ while (!get(roll).isEmpty) roll = seq.genNext(estype)
roll
- }
- else app.id
- val realapp = app.copy(id = id)
- update(realapp)
+ } else app.id
+ update(app.copy(id = id))
Some(id)
}
def get(id: Int): Option[App] = {
try {
- val response = client.prepareGet(
- index,
- estype,
- id.toString).get()
- Some(read[App](response.getSourceAsString))
+ val response = client.performRequest(
+ "GET",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "found").extract[Boolean] match {
+ case true =>
+ Some((jsonResponse \ "_source").extract[App])
+ case _ =>
+ None
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => None
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ None
+ }
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/$id", e)
None
- case e: NullPointerException => None
}
}
def getByName(name: String): Option[App] = {
try {
- val response = client.prepareSearch(index).setTypes(estype).
- setPostFilter(termFilter("name", name)).get
- val hits = response.getHits().hits()
- if (hits.size > 0) {
- Some(read[App](hits.head.getSourceAsString))
- } else {
- None
+ val json =
+ ("query" ->
+ ("term" ->
+ ("name" -> name)))
+ val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/_search",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "hits" \ "total").extract[Long] match {
+ case 0 => None
+ case _ =>
+ val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+ val result = (results.head \ "_source").extract[App]
+ Some(result)
}
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/_search", e)
None
}
}
def getAll(): Seq[App] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[App](client, builder)
+ val json =
+ ("query" ->
+ ("match_all" -> List.empty))
+ ESUtils.getAll[App](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[App]()
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def update(app: App): Unit = {
+ val id = app.id.toString
try {
- val response = client.prepareIndex(index, estype, app.id.toString).
- setSource(write(app)).get()
+ val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "created" =>
+ case "updated" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
}
}
def delete(id: Int): Unit = {
try {
- client.prepareDelete(index, estype, id.toString).get
+ val response = client.performRequest(
+ "DELETE",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val json = parse(EntityUtils.toString(response.getEntity))
+ val result = (json \ "result").extract[String]
+ result match {
+ case "deleted" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/id", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index 52697fd..c90d668 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -15,102 +15,134 @@
* limitations under the License.
*/
-
package org.apache.predictionio.data.storage.elasticsearch
-import grizzled.slf4j.Logging
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.Channel
import org.apache.predictionio.data.storage.Channels
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders.termFilter
-import org.json4s.DefaultFormats
+import org.elasticsearch.client.RestClient
+import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
-class ESChannels(client: Client, config: StorageClientConfig, index: String)
- extends Channels with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
+ extends Channels with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "channels"
private val seq = new ESSequences(client, config, index)
- private val seqName = "channels"
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
+ ESUtils.createIndex(client, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0))~
+ ("properties" ->
+ ("name" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(channel: Channel): Option[Int] = {
val id =
if (channel.id == 0) {
- var roll = seq.genNext(seqName)
- while (!get(roll).isEmpty) roll = seq.genNext(seqName)
+ var roll = seq.genNext(estype)
+ while (!get(roll).isEmpty) roll = seq.genNext(estype)
roll
} else channel.id
- val realChannel = channel.copy(id = id)
- if (update(realChannel)) Some(id) else None
+ if (update(channel.copy(id = id))) Some(id) else None
}
def get(id: Int): Option[Channel] = {
try {
- val response = client.prepareGet(
- index,
- estype,
- id.toString).get()
- Some(read[Channel](response.getSourceAsString))
+ val response = client.performRequest(
+ "GET",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "found").extract[Boolean] match {
+ case true =>
+ Some((jsonResponse \ "_source").extract[Channel])
+ case _ =>
+ None
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => None
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ None
+ }
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/$id", e)
None
- case e: NullPointerException => None
}
}
def getByAppid(appid: Int): Seq[Channel] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype).
- setPostFilter(termFilter("appid", appid))
- ESUtils.getAll[Channel](client, builder)
+ val json =
+ ("query" ->
+ ("term" ->
+ ("appid" -> appid)))
+ ESUtils.getAll[Channel](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[Channel]()
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def update(channel: Channel): Boolean = {
+ val id = channel.id.toString
try {
- val response = client.prepareIndex(index, estype, channel.id.toString).
- setSource(write(channel)).get()
- true
+ val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava,
+ entity)
+ val json = parse(EntityUtils.toString(response.getEntity))
+ val result = (json \ "result").extract[String]
+ result match {
+ case "created" => true
+ case "updated" => true
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ false
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
false
}
}
def delete(id: Int): Unit = {
try {
- client.prepareDelete(index, estype, id.toString).get
+ val response = client.performRequest(
+ "DELETE",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "deleted" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 21690bf..08f87f3 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -15,144 +15,211 @@
* limitations under the License.
*/
-
package org.apache.predictionio.data.storage.elasticsearch
-import grizzled.slf4j.Logging
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.EngineInstance
import org.apache.predictionio.data.storage.EngineInstanceSerializer
import org.apache.predictionio.data.storage.EngineInstances
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
+import org.elasticsearch.client.RestClient
import org.json4s._
+import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
-class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
- extends EngineInstances with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String)
+ extends EngineInstances with Logging {
implicit val formats = DefaultFormats + new EngineInstanceSerializer
private val estype = "engine_instances"
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineVersion" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineVariant" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineFactory" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("batch" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("dataSourceParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("preparatorParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("algorithmsParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("servingParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
+ ESUtils.createIndex(client, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0))~
+ ("properties" ->
+ ("status" -> ("type" -> "keyword")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("engineId" -> ("type" -> "keyword")) ~
+ ("engineVersion" -> ("type" -> "keyword")) ~
+ ("engineVariant" -> ("type" -> "keyword")) ~
+ ("engineFactory" -> ("type" -> "keyword")) ~
+ ("batch" -> ("type" -> "keyword")) ~
+ ("dataSourceParams" -> ("type" -> "keyword")) ~
+ ("preparatorParams" -> ("type" -> "keyword")) ~
+ ("algorithmsParams" -> ("type" -> "keyword")) ~
+ ("servingParams" -> ("type" -> "keyword")) ~
+ ("status" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(i: EngineInstance): String = {
+ val id = i.id match {
+ case x if x.isEmpty =>
+ @scala.annotation.tailrec
+ def generateId(newId: Option[String]): String = {
+ newId match {
+ case Some(x) => x
+ case _ => generateId(preInsert())
+ }
+ }
+ generateId(preInsert())
+ case x => x
+ }
+
+ update(i.copy(id = id))
+ id
+ }
+
+ def preInsert(): Option[String] = {
try {
- val response = client.prepareIndex(index, estype).
- setSource(write(i)).get
- response.getId
+ val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "created" =>
+ Some((jsonResponse \ "_id").extract[String])
+ case _ =>
+ error(s"[$result] Failed to create $index/$estype")
+ None
+ }
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- ""
+ case e: IOException =>
+ error(s"Failed to create $index/$estype", e)
+ None
}
}
def get(id: String): Option[EngineInstance] = {
try {
- val response = client.prepareGet(index, estype, id).get
- if (response.isExists) {
- Some(read[EngineInstance](response.getSourceAsString))
- } else {
- None
+ val response = client.performRequest(
+ "GET",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "found").extract[Boolean] match {
+ case true =>
+ Some((jsonResponse \ "_source").extract[EngineInstance])
+ case _ =>
+ None
}
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => None
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ None
+ }
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/$id", e)
None
}
}
def getAll(): Seq[EngineInstance] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[EngineInstance](client, builder)
+ val json =
+ ("query" ->
+ ("match_all" -> List.empty))
+ ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def getCompleted(
- engineId: String,
- engineVersion: String,
- engineVariant: String): Seq[EngineInstance] = {
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Seq[EngineInstance] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
- andFilter(
- termFilter("status", "COMPLETED"),
- termFilter("engineId", engineId),
- termFilter("engineVersion", engineVersion),
- termFilter("engineVariant", engineVariant))).
- addSort("startTime", SortOrder.DESC)
- ESUtils.getAll[EngineInstance](client, builder)
+ val json =
+ ("query" ->
+ ("bool" ->
+ ("must" -> List(
+ ("term" ->
+ ("status" -> "COMPLETED")),
+ ("term" ->
+ ("engineId" -> engineId)),
+ ("term" ->
+ ("engineVersion" -> engineVersion)),
+ ("term" ->
+ ("engineVariant" -> engineVariant)))))) ~
+ ("sort" -> List(
+ ("startTime" ->
+ ("order" -> "desc"))))
+ ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def getLatestCompleted(
- engineId: String,
- engineVersion: String,
- engineVariant: String): Option[EngineInstance] =
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Option[EngineInstance] =
getCompleted(
engineId,
engineVersion,
engineVariant).headOption
def update(i: EngineInstance): Unit = {
+ val id = i.id
try {
- client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+ val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "created" =>
+ case "updated" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException => error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
}
}
def delete(id: String): Unit = {
try {
- val response = client.prepareDelete(index, estype, id).get
+ val response = client.performRequest(
+ "DELETE",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val json = parse(EntityUtils.toString(response.getEntity))
+ val result = (json \ "result").extract[String]
+ result match {
+ case "deleted" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException => error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
index 65b6691..a965c71 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
@@ -15,59 +15,96 @@
* limitations under the License.
*/
-
package org.apache.predictionio.data.storage.elasticsearch
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifestSerializer
-import org.apache.predictionio.data.storage.StorageClientConfig
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifestSerializer
import org.apache.predictionio.data.storage.EngineManifests
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
import org.json4s._
-import org.json4s.native.Serialization.read
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
-class ESEngineManifests(client: Client, config: StorageClientConfig, index: String)
- extends EngineManifests with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: String)
+ extends EngineManifests with Logging {
implicit val formats = DefaultFormats + new EngineManifestSerializer
private val estype = "engine_manifests"
- private def esid(id: String, version: String) = s"$id $version"
+ private def esid(id: String, version: String) = s"$id-$version"
def insert(engineManifest: EngineManifest): Unit = {
- val json = write(engineManifest)
- val response = client.prepareIndex(
- index,
- estype,
- esid(engineManifest.id, engineManifest.version)).
- setSource(json).execute().actionGet()
+ val id = esid(engineManifest.id, engineManifest.version)
+ try {
+ val entity = new NStringEntity(write(engineManifest), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "created" =>
+ case "updated" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
+ } catch {
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
+ }
}
def get(id: String, version: String): Option[EngineManifest] = {
+ val esId = esid(id, version)
try {
- val response = client.prepareGet(index, estype, esid(id, version)).
- execute().actionGet()
- if (response.isExists) {
- Some(read[EngineManifest](response.getSourceAsString))
- } else {
- None
+ val response = client.performRequest(
+ "GET",
+ s"/$index/$estype/$esId",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "found").extract[Boolean] match {
+ case true =>
+ Some((jsonResponse \ "_source").extract[EngineManifest])
+ case _ =>
+ None
}
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => None
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ None
+ }
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/$esId", e)
None
}
+
}
def getAll(): Seq[EngineManifest] = {
try {
- val builder = client.prepareSearch()
- ESUtils.getAll[EngineManifest](client, builder)
+ val json =
+ ("query" ->
+ ("match_all" -> List.empty))
+ ESUtils.getAll[EngineManifest](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
@@ -75,10 +112,22 @@ class ESEngineManifests(client: Client, config: StorageClientConfig, index: Stri
insert(engineManifest)
def delete(id: String, version: String): Unit = {
+ val esId = esid(id, version)
try {
- client.prepareDelete(index, estype, esid(id, version)).execute().actionGet()
+ val response = client.performRequest(
+ "DELETE",
+ s"/$index/$estype/$esId",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "deleted" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$esId")
+ }
} catch {
- case e: ElasticsearchException => error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$esId", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 85bf820..0e71f79 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -15,122 +15,160 @@
* limitations under the License.
*/
-
package org.apache.predictionio.data.storage.elasticsearch
-import grizzled.slf4j.Logging
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.EvaluationInstance
import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
import org.apache.predictionio.data.storage.EvaluationInstances
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
import org.json4s._
+import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
-class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
- extends EvaluationInstances with Logging {
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String)
+ extends EvaluationInstances with Logging {
implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
private val estype = "evaluation_instances"
+ private val seq = new ESSequences(client, config, index)
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("evaluationClass" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineParamsGeneratorClass" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("batch" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("evaluatorResults" ->
- ("type" -> "string") ~ ("index" -> "no")) ~
- ("evaluatorResultsHTML" ->
- ("type" -> "string") ~ ("index" -> "no")) ~
- ("evaluatorResultsJSON" ->
- ("type" -> "string") ~ ("index" -> "no"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
+ ESUtils.createIndex(client, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0))~
+ ("properties" ->
+ ("status" -> ("type" -> "keyword")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("evaluationClass" -> ("type" -> "keyword")) ~
+ ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+ ("batch" -> ("type" -> "keyword")) ~
+ ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
+ ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
+ ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(i: EvaluationInstance): String = {
- try {
- val response = client.prepareIndex(index, estype).
- setSource(write(i)).get
- response.getId
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- ""
+ val id = i.id match {
+ case x if x.isEmpty =>
+ var roll = seq.genNext(estype).toString
+ while (!get(roll).isEmpty) roll = seq.genNext(estype).toString
+ roll
+ case x => x
}
+
+ update(i.copy(id = id))
+ id
}
def get(id: String): Option[EvaluationInstance] = {
try {
- val response = client.prepareGet(index, estype, id).get
- if (response.isExists) {
- Some(read[EvaluationInstance](response.getSourceAsString))
- } else {
- None
+ val response = client.performRequest(
+ "GET",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "found").extract[Boolean] match {
+ case true =>
+ Some((jsonResponse \ "_source").extract[EvaluationInstance])
+ case _ =>
+ None
}
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => None
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ None
+ }
+ case e: IOException =>
+ error(s"Failed to access to /$index/$estype/$id", e)
None
}
}
def getAll(): Seq[EvaluationInstance] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[EvaluationInstance](client, builder)
+ val json =
+ ("query" ->
+ ("match_all" -> List.empty))
+ ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def getCompleted(): Seq[EvaluationInstance] = {
try {
- val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
- termFilter("status", "EVALCOMPLETED")).
- addSort("startTime", SortOrder.DESC)
- ESUtils.getAll[EvaluationInstance](client, builder)
+ val json =
+ ("query" ->
+ ("term" ->
+ ("status" -> "EVALCOMPLETED"))) ~
+ ("sort" ->
+ ("startTime" ->
+ ("order" -> "desc")))
+ ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
} catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ Nil
}
}
def update(i: EvaluationInstance): Unit = {
+ val id = i.id
try {
- client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+ val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava,
+ entity)
+ val json = parse(EntityUtils.toString(response.getEntity))
+ val result = (json \ "result").extract[String]
+ result match {
+ case "created" =>
+ case "updated" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException => error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
}
}
def delete(id: String): Unit = {
try {
- client.prepareDelete(index, estype, id).get
+ val response = client.performRequest(
+ "DELETE",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava)
+ val json = parse(EntityUtils.toString(response.getEntity))
+ val result = (json \ "result").extract[String]
+ result match {
+ case "deleted" =>
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ }
} catch {
- case e: ElasticsearchException => error(e.getMessage)
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
new file mode 100644
index 0000000..f2ab7c2
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.MapWritable
+import org.apache.hadoop.io.Text
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s._
+
+object ESEventsUtil {
+
+ implicit val formats = DefaultFormats
+
+ def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = {
+
+ def getStringCol(col: String): String = {
+ val r = result.get(new Text(col)).asInstanceOf[Text]
+ require(r != null,
+ s"Failed to get value for column ${col}. " +
+ s"StringBinary: ${r.getBytes()}.")
+
+ r.toString()
+ }
+
+ def getOptStringCol(col: String): Option[String] = {
+ val r = result.get(new Text(col))
+ if (r == null) {
+ None
+ } else {
+ Some(r.asInstanceOf[Text].toString())
+ }
+ }
+
+ val tmp = result
+ .get(new Text("properties")).asInstanceOf[MapWritable]
+ .get(new Text("fields")).asInstanceOf[MapWritable]
+ .get(new Text("rating"))
+
+ val rating =
+ if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable]
+ else if (tmp.isInstanceOf[LongWritable]) {
+ new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble)
+ }
+ else null
+
+ val properties: DataMap =
+ if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""")
+ else DataMap()
+
+
+ val eventId = Some(getStringCol("eventId"))
+ val event = getStringCol("event")
+ val entityType = getStringCol("entityType")
+ val entityId = getStringCol("entityId")
+ val targetEntityType = getOptStringCol("targetEntityType")
+ val targetEntityId = getOptStringCol("targetEntityId")
+ val prId = getOptStringCol("prId")
+ val eventTimeZone = getOptStringCol("eventTimeZone")
+ .map(DateTimeZone.forID(_))
+ .getOrElse(EventValidation.defaultTimeZone)
+ val eventTime = new DateTime(
+ getStringCol("eventTime"), eventTimeZone)
+ val creationTimeZone = getOptStringCol("creationTimeZone")
+ .map(DateTimeZone.forID(_))
+ .getOrElse(EventValidation.defaultTimeZone)
+ val creationTime: DateTime = new DateTime(
+ getStringCol("creationTime"), creationTimeZone)
+
+
+ Event(
+ eventId = eventId,
+ event = event,
+ entityType = entityType,
+ entityId = entityId,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ properties = properties,
+ eventTime = eventTime,
+ tags = Seq(),
+ prId = prId,
+ creationTime = creationTime
+ )
+ }
+
+ def eventToPut(event: Event, appId: Int): Seq[Map[String, Any]] = {
+ Seq(
+ Map(
+ "eventId" -> event.eventId,
+ "event" -> event.event,
+ "entityType" -> event.entityType,
+ "entityId" -> event.entityId,
+ "targetEntityType" -> event.targetEntityType,
+ "targetEntityId" -> event.targetEntityId,
+ "properties" -> event.properties,
+ "eventTime" -> event.eventTime,
+ "tags" -> event.tags,
+ "prId" -> event.prId,
+ "creationTime" -> event.creationTime
+ )
+ )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
new file mode 100644
index 0000000..ef25204
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.joda.time.DateTime
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+import org.json4s.ext.JodaTimeSerializers
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
+ extends LEvents with Logging {
+ implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+ private val seq = new ESSequences(client, config, index)
+ private val seqName = "events"
+
+ def getEsType(appId: Int, channelId: Option[Int] = None): String = {
+ channelId.map { ch =>
+ s"${appId}_${ch}"
+ }.getOrElse {
+ s"${appId}"
+ }
+ }
+
+ override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+ val estype = getEsType(appId, channelId)
+ ESUtils.createIndex(client, index)
+ val json =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
+ ("properties" ->
+ ("name" -> ("type" -> "keyword")) ~
+ ("eventId" -> ("type" -> "keyword")) ~
+ ("event" -> ("type" -> "keyword")) ~
+ ("entityType" -> ("type" -> "keyword")) ~
+ ("entityId" -> ("type" -> "keyword")) ~
+ ("targetEntityType" -> ("type" -> "keyword")) ~
+ ("targetEntityId" -> ("type" -> "keyword")) ~
+ ("properties" ->
+ ("type" -> "nested") ~
+ ("properties" ->
+ ("fields" -> ("type" -> "nested") ~
+ ("properties" ->
+ ("user" -> ("type" -> "long")) ~
+ ("num" -> ("type" -> "long")))))) ~
+ ("eventTime" -> ("type" -> "date")) ~
+ ("tags" -> ("type" -> "keyword")) ~
+ ("prId" -> ("type" -> "keyword")) ~
+ ("creationTime" -> ("type" -> "date"))))
+ ESUtils.createMapping(client, index, estype, compact(render(json)))
+ true
+ }
+
+ override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
+ val estype = getEsType(appId, channelId)
+ try {
+ val json =
+ ("query" ->
+ ("match_all" -> List.empty))
+ val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+ client.performRequest(
+ "POST",
+ s"/$index/$estype/_delete_by_query",
+ Map.empty[String, String].asJava,
+ entity).getStatusLine.getStatusCode match {
+ case 200 => true
+ case _ =>
+ error(s"Failed to remove $index/$estype")
+ false
+ }
+ } catch {
+ case e: Exception =>
+ error(s"Failed to remove $index/$estype", e)
+ false
+ }
+ }
+
+ override def close(): Unit = {
+ try client.close() catch {
+ case e: Exception =>
+ error("Failed to close client.", e)
+ }
+ }
+
+ override def futureInsert(
+ event: Event,
+ appId: Int,
+ channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
+ Future {
+ val estype = getEsType(appId, channelId)
+ val id = event.eventId.getOrElse {
+ var roll = seq.genNext(seqName)
+ while (exists(estype, roll)) roll = seq.genNext(seqName)
+ roll.toString
+ }
+ val json = write(event.copy(eventId = Some(id)))
+ try {
+ val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "created" => id
+ case "updated" => id
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype/$id")
+ ""
+ }
+ } catch {
+ case e: IOException =>
+ error(s"Failed to update $index/$estype/$id: $json", e)
+ ""
+ }
+ }
+ }
+
+ private def exists(estype: String, id: Int): Boolean = {
+ try {
+ client.performRequest(
+ "GET",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+ case 200 => true
+ case _ => false
+ }
+ } catch {
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => false
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ false
+ }
+ case e: IOException =>
+ error(s"Failed to access to $index/$estype/$id", e)
+ false
+ }
+ }
+
+ override def futureGet(
+ eventId: String,
+ appId: Int,
+ channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
+ Future {
+ val estype = getEsType(appId, channelId)
+ try {
+ val json =
+ ("query" ->
+ ("term" ->
+ ("eventId" -> eventId)))
+ val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/_search",
+ Map.empty[String, String].asJava,
+ entity)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ (jsonResponse \ "hits" \ "total").extract[Long] match {
+ case 0 => None
+ case _ =>
+ val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+ val result = (results.head \ "_source").extract[Event]
+ Some(result)
+ }
+ } catch {
+ case e: IOException =>
+ error("Failed to access to /$index/$estype/_search", e)
+ None
+ }
+ }
+ }
+
+ override def futureDelete(
+ eventId: String,
+ appId: Int,
+ channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
+ Future {
+ val estype = getEsType(appId, channelId)
+ try {
+ val json =
+ ("query" ->
+ ("term" ->
+ ("eventId" -> eventId)))
+ val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+ val response = client.performRequest(
+ "POST",
+ s"/$index/$estype/_delete_by_query",
+ Map.empty[String, String].asJava)
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "deleted" => true
+ case _ =>
+ error(s"[$result] Failed to update $index/$estype:$eventId")
+ false
+ }
+ } catch {
+ case e: IOException =>
+ error(s"Failed to update $index/$estype:$eventId", e)
+ false
+ }
+ }
+ }
+
+ override def futureFind(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ limit: Option[Int] = None,
+ reversed: Option[Boolean] = None)
+ (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+ Future {
+ val estype = getEsType(appId, channelId)
+ try {
+ val query = ESUtils.createEventQuery(
+ startTime, untilTime, entityType, entityId,
+ eventNames, targetEntityType, targetEntityId, None)
+ ESUtils.getAll[Event](client, index, estype, query).toIterator
+ } catch {
+ case e: IOException =>
+ error(e.getMessage)
+ Iterator[Event]()
+ }
+ }
+ }
+
+}