You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/08 07:46:01 UTC
[6/7] incubator-predictionio git commit: [PIO-49] Add support for
Elasticsearch 5
[PIO-49] Add support for Elasticsearch 5
* Add support for Elasticsearch 5 over REST API
* Refactor storage implementations to submodules
* Build storage implementation as separate assemblies
Closes #352
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/d78b3cbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/d78b3cbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/d78b3cbe
Branch: refs/heads/develop
Commit: d78b3cbe912cf57e2f0278e407a0d6432bd12849
Parents: 8fd59fd
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Tue Mar 7 20:33:30 2017 -0800
Committer: Donald Szeto <do...@apache.org>
Committed: Tue Mar 7 20:33:30 2017 -0800
----------------------------------------------------------------------
bin/compute-classpath.sh | 2 +-
build.sbt | 28 +-
conf/pio-env.sh.template | 10 +-
core/build.sbt | 1 -
data/build.sbt | 14 -
.../storage/elasticsearch/ESAccessKeys.scala | 119 ------
.../data/storage/elasticsearch/ESApps.scala | 130 ------
.../data/storage/elasticsearch/ESChannels.scala | 117 ------
.../elasticsearch/ESEngineInstances.scala | 158 -------
.../elasticsearch/ESEvaluationInstances.scala | 136 ------
.../storage/elasticsearch/ESSequences.scala | 64 ---
.../data/storage/elasticsearch/ESUtils.scala | 48 ---
.../storage/elasticsearch/StorageClient.scala | 50 ---
.../data/storage/elasticsearch/package.scala | 25 --
.../data/storage/hbase/HBEventsUtil.scala | 415 -------------------
.../data/storage/hbase/HBLEvents.scala | 195 ---------
.../data/storage/hbase/HBPEvents.scala | 131 ------
.../data/storage/hbase/PIOHBaseUtil.scala | 32 --
.../data/storage/hbase/StorageClient.scala | 86 ----
.../data/storage/hbase/package.scala | 25 --
.../data/storage/hbase/upgrade/HB_0_8_0.scala | 193 ---------
.../data/storage/hbase/upgrade/Upgrade.scala | 75 ----
.../storage/hbase/upgrade/Upgrade_0_8_3.scala | 224 ----------
.../data/storage/hdfs/HDFSModels.scala | 63 ---
.../data/storage/hdfs/StorageClient.scala | 36 --
.../data/storage/hdfs/package.scala | 25 --
.../data/storage/jdbc/JDBCAccessKeys.scala | 87 ----
.../data/storage/jdbc/JDBCApps.scala | 89 ----
.../data/storage/jdbc/JDBCChannels.scala | 69 ---
.../data/storage/jdbc/JDBCEngineInstances.scala | 197 ---------
.../storage/jdbc/JDBCEvaluationInstances.scala | 165 --------
.../data/storage/jdbc/JDBCLEvents.scala | 244 -----------
.../data/storage/jdbc/JDBCModels.scala | 55 ---
.../data/storage/jdbc/JDBCPEvents.scala | 188 ---------
.../data/storage/jdbc/JDBCUtils.scala | 106 -----
.../data/storage/jdbc/StorageClient.scala | 53 ---
.../data/storage/jdbc/package.scala | 26 --
.../data/storage/localfs/LocalFSModels.scala | 62 ---
.../data/storage/localfs/StorageClient.scala | 46 --
.../data/storage/localfs/package.scala | 25 --
.../predictionio/data/view/PBatchView.scala | 212 ----------
make-distribution.sh | 6 +-
storage/elasticsearch/.gitignore | 1 +
storage/elasticsearch/build.sbt | 57 +++
.../storage/elasticsearch/ESAccessKeys.scala | 178 ++++++++
.../data/storage/elasticsearch/ESApps.scala | 194 +++++++++
.../data/storage/elasticsearch/ESChannels.scala | 165 ++++++++
.../elasticsearch/ESEngineInstances.scala | 248 +++++++++++
.../elasticsearch/ESEvaluationInstances.scala | 194 +++++++++
.../storage/elasticsearch/ESEventsUtil.scala | 123 ++++++
.../data/storage/elasticsearch/ESLEvents.scala | 291 +++++++++++++
.../data/storage/elasticsearch/ESPEvents.scala | 144 +++++++
.../storage/elasticsearch/ESSequences.scala | 79 ++++
.../data/storage/elasticsearch/ESUtils.scala | 184 ++++++++
.../storage/elasticsearch/StorageClient.scala | 44 ++
.../data/storage/elasticsearch/package.scala | 25 ++
.../src/test/resources/application.conf | 28 ++
storage/elasticsearch1/.gitignore | 1 +
storage/elasticsearch1/build.sbt | 47 +++
.../storage/elasticsearch/ESAccessKeys.scala | 119 ++++++
.../data/storage/elasticsearch/ESApps.scala | 130 ++++++
.../data/storage/elasticsearch/ESChannels.scala | 117 ++++++
.../elasticsearch/ESEngineInstances.scala | 158 +++++++
.../elasticsearch/ESEvaluationInstances.scala | 136 ++++++
.../storage/elasticsearch/ESSequences.scala | 64 +++
.../data/storage/elasticsearch/ESUtils.scala | 48 +++
.../storage/elasticsearch/StorageClient.scala | 50 +++
.../data/storage/elasticsearch/package.scala | 25 ++
.../src/test/resources/application.conf | 28 ++
storage/hbase/.gitignore | 1 +
storage/hbase/build.sbt | 56 +++
.../data/storage/hbase/HBEventsUtil.scala | 415 +++++++++++++++++++
.../data/storage/hbase/HBLEvents.scala | 195 +++++++++
.../data/storage/hbase/HBPEvents.scala | 131 ++++++
.../data/storage/hbase/PIOHBaseUtil.scala | 32 ++
.../data/storage/hbase/StorageClient.scala | 86 ++++
.../data/storage/hbase/package.scala | 25 ++
.../data/storage/hbase/upgrade/HB_0_8_0.scala | 193 +++++++++
.../data/storage/hbase/upgrade/Upgrade.scala | 75 ++++
.../storage/hbase/upgrade/Upgrade_0_8_3.scala | 224 ++++++++++
.../predictionio/data/view/PBatchView.scala | 212 ++++++++++
.../hbase/src/test/resources/application.conf | 28 ++
storage/hdfs/.gitignore | 1 +
storage/hdfs/build.sbt | 44 ++
.../data/storage/hdfs/HDFSModels.scala | 63 +++
.../data/storage/hdfs/StorageClient.scala | 36 ++
.../data/storage/hdfs/package.scala | 25 ++
.../hdfs/src/test/resources/application.conf | 28 ++
storage/jdbc/.gitignore | 1 +
storage/jdbc/build.sbt | 47 +++
.../data/storage/jdbc/JDBCAccessKeys.scala | 87 ++++
.../data/storage/jdbc/JDBCApps.scala | 89 ++++
.../data/storage/jdbc/JDBCChannels.scala | 69 +++
.../data/storage/jdbc/JDBCEngineInstances.scala | 197 +++++++++
.../storage/jdbc/JDBCEvaluationInstances.scala | 165 ++++++++
.../data/storage/jdbc/JDBCLEvents.scala | 244 +++++++++++
.../data/storage/jdbc/JDBCModels.scala | 55 +++
.../data/storage/jdbc/JDBCPEvents.scala | 188 +++++++++
.../data/storage/jdbc/JDBCUtils.scala | 106 +++++
.../data/storage/jdbc/StorageClient.scala | 53 +++
.../data/storage/jdbc/package.scala | 26 ++
.../jdbc/src/test/resources/application.conf | 28 ++
storage/localfs/.gitignore | 1 +
storage/localfs/build.sbt | 44 ++
.../data/storage/localfs/LocalFSModels.scala | 62 +++
.../data/storage/localfs/StorageClient.scala | 46 ++
.../data/storage/localfs/package.scala | 25 ++
.../localfs/src/test/resources/application.conf | 28 ++
tests/Dockerfile | 10 +-
tests/build-docker.sh | 8 +-
tests/docker-compose.yml | 2 +-
tests/docker-files/env-conf/pio-env.sh | 3 +-
tests/pio_tests/scenarios/eventserver_test.py | 5 +-
tests/run_docker.sh | 4 +-
.../org/apache/predictionio/tools/Common.scala | 7 +
.../org/apache/predictionio/tools/Runner.scala | 3 +-
.../predictionio/tools/commands/Engine.scala | 4 +-
117 files changed, 6382 insertions(+), 4005 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/bin/compute-classpath.sh
----------------------------------------------------------------------
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 3bf6814..69cbb25 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -27,7 +27,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
# Build up classpath
CLASSPATH="${FWDIR}/conf"
-CLASSPATH="$CLASSPATH:${FWDIR}/plugins/*"
+CLASSPATH="$CLASSPATH:${FWDIR}/plugins/*:${FWDIR}/lib/spark/*"
ASSEMBLY_DIR="${FWDIR}/assembly"
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index eeb3724..98444b9 100644
--- a/build.sbt
+++ b/build.sbt
@@ -34,11 +34,9 @@ fork in (ThisBuild, run) := true
javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7",
"-Xlint:deprecation", "-Xlint:unchecked")
-elasticsearchVersion in ThisBuild := "1.4.4"
-
json4sVersion in ThisBuild := "3.2.10"
-sparkVersion in ThisBuild := "1.4.0"
+sparkVersion in ThisBuild := "1.6.3"
val pioBuildInfoSettings = buildInfoSettings ++ Seq(
sourceGenerators in Compile <+= buildInfo,
@@ -65,6 +63,30 @@ val data = (project in file("data")).
settings(commonSettings: _*).
settings(genjavadocSettings: _*)
+val dataElasticsearch1 = (project in file("storage/elasticsearch1")).
+ settings(commonSettings: _*).
+ settings(genjavadocSettings: _*)
+
+val dataElasticsearch = (project in file("storage/elasticsearch")).
+ settings(commonSettings: _*).
+ settings(genjavadocSettings: _*)
+
+val dataHbase = (project in file("storage/hbase")).
+ settings(commonSettings: _*).
+ settings(genjavadocSettings: _*)
+
+val dataHdfs = (project in file("storage/hdfs")).
+ settings(commonSettings: _*).
+ settings(genjavadocSettings: _*)
+
+val dataJdbc = (project in file("storage/jdbc")).
+ settings(commonSettings: _*).
+ settings(genjavadocSettings: _*)
+
+val dataLocalfs = (project in file("storage/localfs")).
+ settings(commonSettings: _*).
+ settings(genjavadocSettings: _*)
+
val core = (project in file("core")).
dependsOn(data).
settings(commonSettings: _*).
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index a06cd8e..0d76102 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -24,7 +24,7 @@
# you need to change these to fit your site.
# SPARK_HOME: Apache Spark is a hard dependency and must be configured.
-SPARK_HOME=$PIO_HOME/vendors/spark-1.5.1-bin-hadoop2.6
+SPARK_HOME=$PIO_HOME/vendors/spark-1.6.3-bin-hadoop2.6
POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-9.4-1204.jdbc41.jar
MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.37.jar
@@ -85,10 +85,16 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
# Elasticsearch Example
# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.2.1
+# Elasticsearch 1.x Example
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name>
# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.4.4
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6
# Local File System Example
# PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index 637d4ea..bfb8bf3 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -32,7 +32,6 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
- "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value,
"org.json4s" %% "json4s-native" % json4sVersion.value,
"org.json4s" %% "json4s-ext" % json4sVersion.value,
"org.scalaj" %% "scalaj-http" % "1.1.6",
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/build.sbt
----------------------------------------------------------------------
diff --git a/data/build.sbt b/data/build.sbt
index 4526c39..f5e95b5 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -26,29 +26,15 @@ libraryDependencies ++= Seq(
"mysql" % "mysql-connector-java" % "5.1.37" % "optional",
"org.apache.hadoop" % "hadoop-common" % "2.6.2"
exclude("javax.servlet", "servlet-api"),
- "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2",
- "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2"
- exclude("org.apache.zookeeper", "zookeeper"),
- // added for Parallel storage interface
- "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2"
- exclude("org.apache.hbase", "hbase-client")
- exclude("org.apache.zookeeper", "zookeeper")
- exclude("javax.servlet", "servlet-api")
- exclude("org.mortbay.jetty", "servlet-api-2.5")
- exclude("org.mortbay.jetty", "jsp-api-2.1")
- exclude("org.mortbay.jetty", "jsp-2.1"),
"org.apache.zookeeper" % "zookeeper" % "3.4.7"
exclude("org.slf4j", "slf4j-api")
exclude("org.slf4j", "slf4j-log4j12"),
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
- "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value,
"org.json4s" %% "json4s-native" % json4sVersion.value,
"org.json4s" %% "json4s-ext" % json4sVersion.value,
- "org.postgresql" % "postgresql" % "9.4-1204-jdbc41",
"org.scalatest" %% "scalatest" % "2.1.7" % "test",
- "org.scalikejdbc" %% "scalikejdbc" % "2.3.5",
"org.slf4j" % "slf4j-log4j12" % "1.7.18",
"org.spark-project.akka" %% "akka-actor" % "2.3.4-spark",
"org.specs2" %% "specs2" % "2.3.13" % "test")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
deleted file mode 100644
index 077168a..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.AccessKey
-import org.apache.predictionio.data.storage.AccessKeys
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-import scala.util.Random
-
-/** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
- extends AccessKeys with Logging {
- implicit val formats = DefaultFormats.lossless
- private val estype = "accesskeys"
-
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
-
- def insert(accessKey: AccessKey): Option[String] = {
- val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
- update(accessKey.copy(key = key))
- Some(key)
- }
-
- def get(key: String): Option[AccessKey] = {
- try {
- val response = client.prepareGet(
- index,
- estype,
- key).get()
- Some(read[AccessKey](response.getSourceAsString))
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- None
- case e: NullPointerException => None
- }
- }
-
- def getAll(): Seq[AccessKey] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[AccessKey](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[AccessKey]()
- }
- }
-
- def getByAppid(appid: Int): Seq[AccessKey] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype).
- setPostFilter(termFilter("appid", appid))
- ESUtils.getAll[AccessKey](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[AccessKey]()
- }
- }
-
- def update(accessKey: AccessKey): Unit = {
- try {
- client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- }
- }
-
- def delete(key: String): Unit = {
- try {
- client.prepareDelete(index, estype, key).get
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
deleted file mode 100644
index 3781a4b..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.App
-import org.apache.predictionio.data.storage.Apps
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-/** Elasticsearch implementation of Items. */
-class ESApps(client: Client, config: StorageClientConfig, index: String)
- extends Apps with Logging {
- implicit val formats = DefaultFormats.lossless
- private val estype = "apps"
- private val seq = new ESSequences(client, config, index)
-
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
-
- def insert(app: App): Option[Int] = {
- val id =
- if (app.id == 0) {
- var roll = seq.genNext("apps")
- while (!get(roll).isEmpty) roll = seq.genNext("apps")
- roll
- }
- else app.id
- val realapp = app.copy(id = id)
- update(realapp)
- Some(id)
- }
-
- def get(id: Int): Option[App] = {
- try {
- val response = client.prepareGet(
- index,
- estype,
- id.toString).get()
- Some(read[App](response.getSourceAsString))
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- None
- case e: NullPointerException => None
- }
- }
-
- def getByName(name: String): Option[App] = {
- try {
- val response = client.prepareSearch(index).setTypes(estype).
- setPostFilter(termFilter("name", name)).get
- val hits = response.getHits().hits()
- if (hits.size > 0) {
- Some(read[App](hits.head.getSourceAsString))
- } else {
- None
- }
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- None
- }
- }
-
- def getAll(): Seq[App] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[App](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[App]()
- }
- }
-
- def update(app: App): Unit = {
- try {
- val response = client.prepareIndex(index, estype, app.id.toString).
- setSource(write(app)).get()
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- }
- }
-
- def delete(id: Int): Unit = {
- try {
- client.prepareDelete(index, estype, id.toString).get
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
deleted file mode 100644
index 52697fd..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.Channel
-import org.apache.predictionio.data.storage.Channels
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders.termFilter
-import org.json4s.DefaultFormats
-import org.json4s.JsonDSL._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESChannels(client: Client, config: StorageClientConfig, index: String)
- extends Channels with Logging {
-
- implicit val formats = DefaultFormats.lossless
- private val estype = "channels"
- private val seq = new ESSequences(client, config, index)
- private val seqName = "channels"
-
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
-
- def insert(channel: Channel): Option[Int] = {
- val id =
- if (channel.id == 0) {
- var roll = seq.genNext(seqName)
- while (!get(roll).isEmpty) roll = seq.genNext(seqName)
- roll
- } else channel.id
-
- val realChannel = channel.copy(id = id)
- if (update(realChannel)) Some(id) else None
- }
-
- def get(id: Int): Option[Channel] = {
- try {
- val response = client.prepareGet(
- index,
- estype,
- id.toString).get()
- Some(read[Channel](response.getSourceAsString))
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- None
- case e: NullPointerException => None
- }
- }
-
- def getByAppid(appid: Int): Seq[Channel] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype).
- setPostFilter(termFilter("appid", appid))
- ESUtils.getAll[Channel](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq[Channel]()
- }
- }
-
- def update(channel: Channel): Boolean = {
- try {
- val response = client.prepareIndex(index, estype, channel.id.toString).
- setSource(write(channel)).get()
- true
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- false
- }
- }
-
- def delete(id: Int): Unit = {
- try {
- client.prepareDelete(index, estype, id.toString).get
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
deleted file mode 100644
index 21690bf..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineInstance
-import org.apache.predictionio.data.storage.EngineInstanceSerializer
-import org.apache.predictionio.data.storage.EngineInstances
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
- extends EngineInstances with Logging {
- implicit val formats = DefaultFormats + new EngineInstanceSerializer
- private val estype = "engine_instances"
-
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineVersion" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineVariant" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineFactory" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("batch" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("dataSourceParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("preparatorParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("algorithmsParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("servingParams" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
-
- def insert(i: EngineInstance): String = {
- try {
- val response = client.prepareIndex(index, estype).
- setSource(write(i)).get
- response.getId
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- ""
- }
- }
-
- def get(id: String): Option[EngineInstance] = {
- try {
- val response = client.prepareGet(index, estype, id).get
- if (response.isExists) {
- Some(read[EngineInstance](response.getSourceAsString))
- } else {
- None
- }
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- None
- }
- }
-
- def getAll(): Seq[EngineInstance] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[EngineInstance](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
- }
- }
-
- def getCompleted(
- engineId: String,
- engineVersion: String,
- engineVariant: String): Seq[EngineInstance] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
- andFilter(
- termFilter("status", "COMPLETED"),
- termFilter("engineId", engineId),
- termFilter("engineVersion", engineVersion),
- termFilter("engineVariant", engineVariant))).
- addSort("startTime", SortOrder.DESC)
- ESUtils.getAll[EngineInstance](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
- }
- }
-
- def getLatestCompleted(
- engineId: String,
- engineVersion: String,
- engineVariant: String): Option[EngineInstance] =
- getCompleted(
- engineId,
- engineVersion,
- engineVariant).headOption
-
- def update(i: EngineInstance): Unit = {
- try {
- client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
- } catch {
- case e: ElasticsearchException => error(e.getMessage)
- }
- }
-
- def delete(id: String): Unit = {
- try {
- val response = client.prepareDelete(index, estype, id).get
- } catch {
- case e: ElasticsearchException => error(e.getMessage)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
deleted file mode 100644
index 85bf820..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EvaluationInstance
-import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
-import org.apache.predictionio.data.storage.EvaluationInstances
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
- extends EvaluationInstances with Logging {
- implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
- private val estype = "evaluation_instances"
-
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val json =
- (estype ->
- ("properties" ->
- ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("evaluationClass" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("engineParamsGeneratorClass" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("batch" ->
- ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
- ("evaluatorResults" ->
- ("type" -> "string") ~ ("index" -> "no")) ~
- ("evaluatorResultsHTML" ->
- ("type" -> "string") ~ ("index" -> "no")) ~
- ("evaluatorResultsJSON" ->
- ("type" -> "string") ~ ("index" -> "no"))))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(json))).get
- }
-
- def insert(i: EvaluationInstance): String = {
- try {
- val response = client.prepareIndex(index, estype).
- setSource(write(i)).get
- response.getId
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- ""
- }
- }
-
- def get(id: String): Option[EvaluationInstance] = {
- try {
- val response = client.prepareGet(index, estype, id).get
- if (response.isExists) {
- Some(read[EvaluationInstance](response.getSourceAsString))
- } else {
- None
- }
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- None
- }
- }
-
- def getAll(): Seq[EvaluationInstance] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype)
- ESUtils.getAll[EvaluationInstance](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
- }
- }
-
- def getCompleted(): Seq[EvaluationInstance] = {
- try {
- val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
- termFilter("status", "EVALCOMPLETED")).
- addSort("startTime", SortOrder.DESC)
- ESUtils.getAll[EvaluationInstance](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
- }
- }
-
- def update(i: EvaluationInstance): Unit = {
- try {
- client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
- } catch {
- case e: ElasticsearchException => error(e.getMessage)
- }
- }
-
- def delete(id: String): Unit = {
- try {
- client.prepareDelete(index, estype, id).get
- } catch {
- case e: ElasticsearchException => error(e.getMessage)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
deleted file mode 100644
index 5c9e170..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-
-class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging {
- implicit val formats = DefaultFormats
- private val estype = "sequences"
-
- val indices = client.admin.indices
- val indexExistResponse = indices.prepareExists(index).get
- if (!indexExistResponse.isExists) {
- // val settingsJson =
- // ("number_of_shards" -> 1) ~
- // ("auto_expand_replicas" -> "0-all")
- indices.prepareCreate(index).get
- }
- val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
- if (!typeExistResponse.isExists) {
- val mappingJson =
- (estype ->
- ("_source" -> ("enabled" -> 0)) ~
- ("_all" -> ("enabled" -> 0)) ~
- ("_type" -> ("index" -> "no")) ~
- ("enabled" -> 0))
- indices.preparePutMapping(index).setType(estype).
- setSource(compact(render(mappingJson))).get
- }
-
- def genNext(name: String): Int = {
- try {
- val response = client.prepareIndex(index, estype, name).
- setSource(compact(render("n" -> name))).get
- response.getVersion().toInt
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- 0
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
deleted file mode 100644
index f5c99bf..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import org.elasticsearch.action.search.SearchRequestBuilder
-import org.elasticsearch.client.Client
-import org.elasticsearch.common.unit.TimeValue
-import org.json4s.Formats
-import org.json4s.native.Serialization.read
-
-import scala.collection.mutable.ArrayBuffer
-
-object ESUtils {
- val scrollLife = new TimeValue(60000)
-
- def getAll[T : Manifest](
- client: Client,
- builder: SearchRequestBuilder)(
- implicit formats: Formats): Seq[T] = {
- val results = ArrayBuffer[T]()
- var response = builder.setScroll(scrollLife).get
- var hits = response.getHits().hits()
- results ++= hits.map(h => read[T](h.getSourceAsString))
- while (hits.size > 0) {
- response = client.prepareSearchScroll(response.getScrollId).
- setScroll(scrollLife).get
- hits = response.getHits().hits()
- results ++= hits.map(h => read[T](h.getSourceAsString))
- }
- results
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
deleted file mode 100644
index 75ac2b0..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.BaseStorageClient
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.transport.TransportClient
-import org.elasticsearch.common.settings.ImmutableSettings
-import org.elasticsearch.common.transport.InetSocketTransportAddress
-import org.elasticsearch.transport.ConnectTransportException
-
-class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
- with Logging {
- override val prefix = "ES"
- val client = try {
- val hosts = config.properties.get("HOSTS").
- map(_.split(",").toSeq).getOrElse(Seq("localhost"))
- val ports = config.properties.get("PORTS").
- map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
- val settings = ImmutableSettings.settingsBuilder()
- .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch"))
- val transportClient = new TransportClient(settings)
- (hosts zip ports) foreach { hp =>
- transportClient.addTransportAddress(
- new InetSocketTransportAddress(hp._1, hp._2))
- }
- transportClient
- } catch {
- case e: ConnectTransportException =>
- throw new StorageClientException(e.getMessage, e)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
deleted file mode 100644
index 0c549b8..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-/** Elasticsearch implementation of storage traits, supporting meta data only
- *
- * @group Implementation
- */
-package object elasticsearch {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
deleted file mode 100644
index 2cdb734..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.hbase
-
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventValidation
-import org.apache.predictionio.data.storage.DataMap
-
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.filter.FilterList
-import org.apache.hadoop.hbase.filter.RegexStringComparator
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
-import org.apache.hadoop.hbase.filter.BinaryComparator
-import org.apache.hadoop.hbase.filter.QualifierFilter
-import org.apache.hadoop.hbase.filter.SkipFilter
-
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-import org.json4s.native.Serialization.{ read, write }
-
-import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
-
-import org.apache.commons.codec.binary.Base64
-import java.security.MessageDigest
-
-import java.util.UUID
-
-/* common utility function for accessing EventsStore in HBase */
-object HBEventsUtil {
-
- implicit val formats = DefaultFormats
-
- def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = {
- channelId.map { ch =>
- s"${namespace}:events_${appId}_${ch}"
- }.getOrElse {
- s"${namespace}:events_${appId}"
- }
- }
-
- // column names for "e" column family
- val colNames: Map[String, Array[Byte]] = Map(
- "event" -> "e",
- "entityType" -> "ety",
- "entityId" -> "eid",
- "targetEntityType" -> "tety",
- "targetEntityId" -> "teid",
- "properties" -> "p",
- "prId" -> "prid",
- "eventTime" -> "et",
- "eventTimeZone" -> "etz",
- "creationTime" -> "ct",
- "creationTimeZone" -> "ctz"
- ).mapValues(Bytes.toBytes(_))
-
- def hash(entityType: String, entityId: String): Array[Byte] = {
- val s = entityType + "-" + entityId
- // get a new MessageDigest object each time for thread-safe
- val md5 = MessageDigest.getInstance("MD5")
- md5.digest(Bytes.toBytes(s))
- }
-
- class RowKey(
- val b: Array[Byte]
- ) {
- require((b.size == 32), s"Incorrect b size: ${b.size}")
- lazy val entityHash: Array[Byte] = b.slice(0, 16)
- lazy val millis: Long = Bytes.toLong(b.slice(16, 24))
- lazy val uuidLow: Long = Bytes.toLong(b.slice(24, 32))
-
- lazy val toBytes: Array[Byte] = b
-
- override def toString: String = {
- Base64.encodeBase64URLSafeString(toBytes)
- }
- }
-
- object RowKey {
- def apply(
- entityType: String,
- entityId: String,
- millis: Long,
- uuidLow: Long): RowKey = {
- // add UUID least significant bits for multiple actions at the same time
- // (UUID's most significant bits are actually timestamp,
- // use eventTime instead).
- val b = hash(entityType, entityId) ++
- Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow)
- new RowKey(b)
- }
-
- // get RowKey from string representation
- def apply(s: String): RowKey = {
- try {
- apply(Base64.decodeBase64(s))
- } catch {
- case e: Exception => throw new RowKeyException(
- s"Failed to convert String ${s} to RowKey because ${e}", e)
- }
- }
-
- def apply(b: Array[Byte]): RowKey = {
- if (b.size != 32) {
- val bString = b.mkString(",")
- throw new RowKeyException(
- s"Incorrect byte array size. Bytes: ${bString}.")
- }
- new RowKey(b)
- }
-
- }
-
- class RowKeyException(val msg: String, val cause: Exception)
- extends Exception(msg, cause) {
- def this(msg: String) = this(msg, null)
- }
-
- case class PartialRowKey(entityType: String, entityId: String,
- millis: Option[Long] = None) {
- val toBytes: Array[Byte] = {
- hash(entityType, entityId) ++
- (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]()))
- }
- }
-
- def eventToPut(event: Event, appId: Int): (Put, RowKey) = {
- // generate new rowKey if eventId is None
- val rowKey = event.eventId.map { id =>
- RowKey(id) // create rowKey from eventId
- }.getOrElse {
- // TOOD: use real UUID. not pseudo random
- val uuidLow: Long = UUID.randomUUID().getLeastSignificantBits
- RowKey(
- entityType = event.entityType,
- entityId = event.entityId,
- millis = event.eventTime.getMillis,
- uuidLow = uuidLow
- )
- }
-
- val eBytes = Bytes.toBytes("e")
- // use eventTime as HBase's cell timestamp
- val put = new Put(rowKey.toBytes, event.eventTime.getMillis)
-
- def addStringToE(col: Array[Byte], v: String): Put = {
- put.add(eBytes, col, Bytes.toBytes(v))
- }
-
- def addLongToE(col: Array[Byte], v: Long): Put = {
- put.add(eBytes, col, Bytes.toBytes(v))
- }
-
- addStringToE(colNames("event"), event.event)
- addStringToE(colNames("entityType"), event.entityType)
- addStringToE(colNames("entityId"), event.entityId)
-
- event.targetEntityType.foreach { targetEntityType =>
- addStringToE(colNames("targetEntityType"), targetEntityType)
- }
-
- event.targetEntityId.foreach { targetEntityId =>
- addStringToE(colNames("targetEntityId"), targetEntityId)
- }
-
- // TODO: make properties Option[]
- if (!event.properties.isEmpty) {
- addStringToE(colNames("properties"), write(event.properties.toJObject))
- }
-
- event.prId.foreach { prId =>
- addStringToE(colNames("prId"), prId)
- }
-
- addLongToE(colNames("eventTime"), event.eventTime.getMillis)
- val eventTimeZone = event.eventTime.getZone
- if (!eventTimeZone.equals(EventValidation.defaultTimeZone)) {
- addStringToE(colNames("eventTimeZone"), eventTimeZone.getID)
- }
-
- addLongToE(colNames("creationTime"), event.creationTime.getMillis)
- val creationTimeZone = event.creationTime.getZone
- if (!creationTimeZone.equals(EventValidation.defaultTimeZone)) {
- addStringToE(colNames("creationTimeZone"), creationTimeZone.getID)
- }
-
- // can use zero-length byte array for tag cell value
- (put, rowKey)
- }
-
- def resultToEvent(result: Result, appId: Int): Event = {
- val rowKey = RowKey(result.getRow())
-
- val eBytes = Bytes.toBytes("e")
- // val e = result.getFamilyMap(eBytes)
-
- def getStringCol(col: String): String = {
- val r = result.getValue(eBytes, colNames(col))
- require(r != null,
- s"Failed to get value for column ${col}. " +
- s"Rowkey: ${rowKey.toString} " +
- s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
-
- Bytes.toString(r)
- }
-
- def getLongCol(col: String): Long = {
- val r = result.getValue(eBytes, colNames(col))
- require(r != null,
- s"Failed to get value for column ${col}. " +
- s"Rowkey: ${rowKey.toString} " +
- s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
-
- Bytes.toLong(r)
- }
-
- def getOptStringCol(col: String): Option[String] = {
- val r = result.getValue(eBytes, colNames(col))
- if (r == null) {
- None
- } else {
- Some(Bytes.toString(r))
- }
- }
-
- def getTimestamp(col: String): Long = {
- result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp()
- }
-
- val event = getStringCol("event")
- val entityType = getStringCol("entityType")
- val entityId = getStringCol("entityId")
- val targetEntityType = getOptStringCol("targetEntityType")
- val targetEntityId = getOptStringCol("targetEntityId")
- val properties: DataMap = getOptStringCol("properties")
- .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
- val prId = getOptStringCol("prId")
- val eventTimeZone = getOptStringCol("eventTimeZone")
- .map(DateTimeZone.forID(_))
- .getOrElse(EventValidation.defaultTimeZone)
- val eventTime = new DateTime(
- getLongCol("eventTime"), eventTimeZone)
- val creationTimeZone = getOptStringCol("creationTimeZone")
- .map(DateTimeZone.forID(_))
- .getOrElse(EventValidation.defaultTimeZone)
- val creationTime: DateTime = new DateTime(
- getLongCol("creationTime"), creationTimeZone)
-
- Event(
- eventId = Some(RowKey(result.getRow()).toString),
- event = event,
- entityType = entityType,
- entityId = entityId,
- targetEntityType = targetEntityType,
- targetEntityId = targetEntityId,
- properties = properties,
- eventTime = eventTime,
- tags = Seq(),
- prId = prId,
- creationTime = creationTime
- )
- }
-
-
- // for mandatory field. None means don't care.
- // for optional field. None means don't care.
- // Some(None) means not exist.
- // Some(Some(x)) means it should match x
- def createScan(
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- entityType: Option[String] = None,
- entityId: Option[String] = None,
- eventNames: Option[Seq[String]] = None,
- targetEntityType: Option[Option[String]] = None,
- targetEntityId: Option[Option[String]] = None,
- reversed: Option[Boolean] = None): Scan = {
-
- val scan: Scan = new Scan()
-
- (entityType, entityId) match {
- case (Some(et), Some(eid)) => {
- val start = PartialRowKey(et, eid,
- startTime.map(_.getMillis)).toBytes
- // if no untilTime, stop when reach next bytes of entityTypeAndId
- val stop = PartialRowKey(et, eid,
- untilTime.map(_.getMillis).orElse(Some(-1))).toBytes
-
- if (reversed.getOrElse(false)) {
- // Reversed order.
- // If you specify a startRow and stopRow,
- // to scan in reverse, the startRow needs to be lexicographically
- // after the stopRow.
- scan.setStartRow(stop)
- scan.setStopRow(start)
- scan.setReversed(true)
- } else {
- scan.setStartRow(start)
- scan.setStopRow(stop)
- }
- }
- case (_, _) => {
- val minTime: Long = startTime.map(_.getMillis).getOrElse(0)
- val maxTime: Long = untilTime.map(_.getMillis).getOrElse(Long.MaxValue)
- scan.setTimeRange(minTime, maxTime)
- if (reversed.getOrElse(false)) {
- scan.setReversed(true)
- }
- }
- }
-
- val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL)
-
- val eBytes = Bytes.toBytes("e")
-
- def createBinaryFilter(col: String, value: Array[Byte]): SingleColumnValueFilter = {
- val comp = new BinaryComparator(value)
- new SingleColumnValueFilter(
- eBytes, colNames(col), CompareOp.EQUAL, comp)
- }
-
- // skip the row if the column exists
- def createSkipRowIfColumnExistFilter(col: String): SkipFilter = {
- val comp = new BinaryComparator(colNames(col))
- val q = new QualifierFilter(CompareOp.NOT_EQUAL, comp)
- // filters an entire row if any of the Cell checks do not pass
- new SkipFilter(q)
- }
-
- entityType.foreach { et =>
- val compType = new BinaryComparator(Bytes.toBytes(et))
- val filterType = new SingleColumnValueFilter(
- eBytes, colNames("entityType"), CompareOp.EQUAL, compType)
- filters.addFilter(filterType)
- }
-
- entityId.foreach { eid =>
- val compId = new BinaryComparator(Bytes.toBytes(eid))
- val filterId = new SingleColumnValueFilter(
- eBytes, colNames("entityId"), CompareOp.EQUAL, compId)
- filters.addFilter(filterId)
- }
-
- eventNames.foreach { eventsList =>
- // match any of event in the eventsList
- val eventFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE)
- eventsList.foreach { e =>
- val compEvent = new BinaryComparator(Bytes.toBytes(e))
- val filterEvent = new SingleColumnValueFilter(
- eBytes, colNames("event"), CompareOp.EQUAL, compEvent)
- eventFilters.addFilter(filterEvent)
- }
- if (!eventFilters.getFilters().isEmpty) {
- filters.addFilter(eventFilters)
- }
- }
-
- targetEntityType.foreach { tetOpt =>
- if (tetOpt.isEmpty) {
- val filter = createSkipRowIfColumnExistFilter("targetEntityType")
- filters.addFilter(filter)
- } else {
- tetOpt.foreach { tet =>
- val filter = createBinaryFilter(
- "targetEntityType", Bytes.toBytes(tet))
- // the entire row will be skipped if the column is not found.
- filter.setFilterIfMissing(true)
- filters.addFilter(filter)
- }
- }
- }
-
- targetEntityId.foreach { teidOpt =>
- if (teidOpt.isEmpty) {
- val filter = createSkipRowIfColumnExistFilter("targetEntityId")
- filters.addFilter(filter)
- } else {
- teidOpt.foreach { teid =>
- val filter = createBinaryFilter(
- "targetEntityId", Bytes.toBytes(teid))
- // the entire row will be skipped if the column is not found.
- filter.setFilterIfMissing(true)
- filters.addFilter(filter)
- }
- }
- }
-
- if (!filters.getFilters().isEmpty) {
- scan.setFilter(filters)
- }
-
- scan
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
deleted file mode 100644
index 360b007..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.hbase
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.LEvents
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.hbase.HBEventsUtil.RowKey
-import org.apache.hadoop.hbase.HColumnDescriptor
-import org.apache.hadoop.hbase.HTableDescriptor
-import org.apache.hadoop.hbase.NamespaceDescriptor
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client._
-import org.joda.time.DateTime
-
-import scala.collection.JavaConversions._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-
-class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String)
- extends LEvents with Logging {
-
- // implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer
-
- def resultToEvent(result: Result, appId: Int): Event =
- HBEventsUtil.resultToEvent(result, appId)
-
- def getTable(appId: Int, channelId: Option[Int] = None): HTableInterface =
- client.connection.getTable(HBEventsUtil.tableName(namespace, appId, channelId))
-
- override
- def init(appId: Int, channelId: Option[Int] = None): Boolean = {
- // check namespace exist
- val existingNamespace = client.admin.listNamespaceDescriptors()
- .map(_.getName)
- if (!existingNamespace.contains(namespace)) {
- val nameDesc = NamespaceDescriptor.create(namespace).build()
- info(s"The namespace ${namespace} doesn't exist yet. Creating now...")
- client.admin.createNamespace(nameDesc)
- }
-
- val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
- if (!client.admin.tableExists(tableName)) {
- info(s"The table ${tableName.getNameAsString()} doesn't exist yet." +
- " Creating now...")
- val tableDesc = new HTableDescriptor(tableName)
- tableDesc.addFamily(new HColumnDescriptor("e"))
- tableDesc.addFamily(new HColumnDescriptor("r")) // reserved
- client.admin.createTable(tableDesc)
- }
- true
- }
-
- override
- def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
- val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
- try {
- if (client.admin.tableExists(tableName)) {
- info(s"Removing table ${tableName.getNameAsString()}...")
- client.admin.disableTable(tableName)
- client.admin.deleteTable(tableName)
- } else {
- info(s"Table ${tableName.getNameAsString()} doesn't exist." +
- s" Nothing is deleted.")
- }
- true
- } catch {
- case e: Exception => {
- error(s"Fail to remove table for appId ${appId}. Exception: ${e}")
- false
- }
- }
- }
-
- override
- def close(): Unit = {
- client.admin.close()
- client.connection.close()
- }
-
- override
- def futureInsert(
- event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
- Future[String] = {
- Future {
- val table = getTable(appId, channelId)
- val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
- table.put(put)
- table.flushCommits()
- table.close()
- rowKey.toString
- }
- }
-
- override
- def futureGet(
- eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
- Future[Option[Event]] = {
- Future {
- val table = getTable(appId, channelId)
- val rowKey = RowKey(eventId)
- val get = new Get(rowKey.toBytes)
-
- val result = table.get(get)
- table.close()
-
- if (!result.isEmpty()) {
- val event = resultToEvent(result, appId)
- Some(event)
- } else {
- None
- }
- }
- }
-
- override
- def futureDelete(
- eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
- Future[Boolean] = {
- Future {
- val table = getTable(appId, channelId)
- val rowKey = RowKey(eventId)
- val exists = table.exists(new Get(rowKey.toBytes))
- table.delete(new Delete(rowKey.toBytes))
- table.close()
- exists
- }
- }
-
- override
- def futureFind(
- appId: Int,
- channelId: Option[Int] = None,
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- entityType: Option[String] = None,
- entityId: Option[String] = None,
- eventNames: Option[Seq[String]] = None,
- targetEntityType: Option[Option[String]] = None,
- targetEntityId: Option[Option[String]] = None,
- limit: Option[Int] = None,
- reversed: Option[Boolean] = None)(implicit ec: ExecutionContext):
- Future[Iterator[Event]] = {
- Future {
-
- require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)),
- "the parameter reversed can only be used with both entityType and entityId specified.")
-
- val table = getTable(appId, channelId)
-
- val scan = HBEventsUtil.createScan(
- startTime = startTime,
- untilTime = untilTime,
- entityType = entityType,
- entityId = entityId,
- eventNames = eventNames,
- targetEntityType = targetEntityType,
- targetEntityId = targetEntityId,
- reversed = reversed)
- val scanner = table.getScanner(scan)
- table.close()
-
- val eventsIter = scanner.iterator()
-
- // Get all events if None or Some(-1)
- val results: Iterator[Result] = limit match {
- case Some(-1) => eventsIter
- case None => eventsIter
- case Some(x) => eventsIter.take(x)
- }
-
- val eventsIt = results.map { resultToEvent(_, appId) }
-
- eventsIt
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
deleted file mode 100644
index 7324fa6..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.data.storage.hbase
-
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.client.{Delete, HTable, Result}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.OutputFormat
-import org.apache.predictionio.data.storage.{Event, PEvents, StorageClientConfig}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.joda.time.DateTime
-
-class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String) extends PEvents {
-
- def checkTableExists(appId: Int, channelId: Option[Int]): Unit = {
- if (!client.admin.tableExists(HBEventsUtil.tableName(namespace, appId, channelId))) {
- if (channelId.nonEmpty) {
- logger.error(s"The appId $appId with channelId $channelId does not exist." +
- s" Please use valid appId and channelId.")
- throw new Exception(s"HBase table not found for appId $appId" +
- s" with channelId $channelId.")
- } else {
- logger.error(s"The appId $appId does not exist. Please use valid appId.")
- throw new Exception(s"HBase table not found for appId $appId.")
- }
- }
- }
-
- override
- def find(
- appId: Int,
- channelId: Option[Int] = None,
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- entityType: Option[String] = None,
- entityId: Option[String] = None,
- eventNames: Option[Seq[String]] = None,
- targetEntityType: Option[Option[String]] = None,
- targetEntityId: Option[Option[String]] = None
- )(sc: SparkContext): RDD[Event] = {
-
- checkTableExists(appId, channelId)
-
- val conf = HBaseConfiguration.create()
- conf.set(TableInputFormat.INPUT_TABLE,
- HBEventsUtil.tableName(namespace, appId, channelId))
-
- val scan = HBEventsUtil.createScan(
- startTime = startTime,
- untilTime = untilTime,
- entityType = entityType,
- entityId = entityId,
- eventNames = eventNames,
- targetEntityType = targetEntityType,
- targetEntityId = targetEntityId,
- reversed = None)
- scan.setCaching(500) // TODO
- scan.setCacheBlocks(false) // TODO
-
- conf.set(TableInputFormat.SCAN, PIOHBaseUtil.convertScanToString(scan))
-
- // HBase is not accessed until this rdd is actually used.
- val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
- classOf[ImmutableBytesWritable],
- classOf[Result]).map {
- case (key, row) => HBEventsUtil.resultToEvent(row, appId)
- }
-
- rdd
- }
-
- override
- def write(
- events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
-
- checkTableExists(appId, channelId)
-
- val conf = HBaseConfiguration.create()
- conf.set(TableOutputFormat.OUTPUT_TABLE,
- HBEventsUtil.tableName(namespace, appId, channelId))
- conf.setClass("mapreduce.outputformat.class",
- classOf[TableOutputFormat[Object]],
- classOf[OutputFormat[Object, Writable]])
-
- events.map { event =>
- val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
- (new ImmutableBytesWritable(rowKey.toBytes), put)
- }.saveAsNewAPIHadoopDataset(conf)
-
- }
-
- def delete(
- eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
-
- checkTableExists(appId, channelId)
-
- val tableName = HBEventsUtil.tableName(namespace, appId, channelId)
-
- eventIds.foreachPartition{ iter =>
- val conf = HBaseConfiguration.create()
- conf.set(TableOutputFormat.OUTPUT_TABLE,
- tableName)
-
- val table = new HTable(conf, tableName)
- iter.foreach { id =>
- val rowKey = HBEventsUtil.RowKey(id)
- val delete = new Delete(rowKey.b)
- table.delete(delete)
- }
- table.close
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
deleted file mode 100644
index 745fcb9..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.data.storage.hbase
-
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil
-import org.apache.hadoop.hbase.util.Base64
-
-object PIOHBaseUtil {
- /*
- * Copying this from Apache HBase because of its restrictive scope in 0.98.x
- */
- def convertScanToString(scan: Scan): String = {
- val proto = ProtobufUtil.toScan(scan)
- Base64.encodeBytes(proto.toByteArray)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
deleted file mode 100644
index 1720410..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.hbase
-
-import org.apache.predictionio.data.storage.BaseStorageClient
-import org.apache.predictionio.data.storage.StorageClientConfig
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.MasterNotRunningException
-import org.apache.hadoop.hbase.ZooKeeperConnectionException
-import org.apache.hadoop.hbase.client.HConnectionManager
-import org.apache.hadoop.hbase.client.HConnection
-import org.apache.hadoop.hbase.client.HBaseAdmin
-
-import grizzled.slf4j.Logging
-
-case class HBClient(
- val conf: Configuration,
- val connection: HConnection,
- val admin: HBaseAdmin
-)
-
-class StorageClient(val config: StorageClientConfig)
- extends BaseStorageClient with Logging {
-
- val conf = HBaseConfiguration.create()
-
- if (config.test) {
- // use fewer retries and shorter timeout for test mode
- conf.set("hbase.client.retries.number", "1")
- conf.set("zookeeper.session.timeout", "30000");
- conf.set("zookeeper.recovery.retry", "1")
- }
-
- try {
- HBaseAdmin.checkHBaseAvailable(conf)
- } catch {
- case e: MasterNotRunningException =>
- error("HBase master is not running (ZooKeeper ensemble: " +
- conf.get("hbase.zookeeper.quorum") + "). Please make sure that HBase " +
- "is running properly, and that the configuration is pointing at the " +
- "correct ZooKeeper ensemble.")
- throw e
- case e: ZooKeeperConnectionException =>
- error("Cannot connect to ZooKeeper (ZooKeeper ensemble: " +
- conf.get("hbase.zookeeper.quorum") + "). Please make sure that the " +
- "configuration is pointing at the correct ZooKeeper ensemble. By " +
- "default, HBase manages its own ZooKeeper, so if you have not " +
- "configured HBase to use an external ZooKeeper, that means your " +
- "HBase is not started or configured properly.")
- throw e
- case e: Exception => {
- error("Failed to connect to HBase." +
- " Please check if HBase is running properly.")
- throw e
- }
- }
-
- val connection = HConnectionManager.createConnection(conf)
-
- val client = HBClient(
- conf = conf,
- connection = connection,
- admin = new HBaseAdmin(connection)
- )
-
- override
- val prefix = "HB"
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
deleted file mode 100644
index 49bf031..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-/** HBase implementation of storage traits, supporting event data only
- *
- * @group Implementation
- */
-package object hbase {}