You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/08 07:45:58 UTC
[3/7] incubator-predictionio git commit: [PIO-49] Add support for
Elasticsearch 5
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
new file mode 100644
index 0000000..077168a
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+import scala.util.Random
+
+/** Elasticsearch implementation of AccessKeys. */
+class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
+ extends AccessKeys with Logging {
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "accesskeys"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(accessKey: AccessKey): Option[String] = {
+ val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+ update(accessKey.copy(key = key))
+ Some(key)
+ }
+
+ def get(key: String): Option[AccessKey] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ key).get()
+ Some(read[AccessKey](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getAll(): Seq[AccessKey] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[AccessKey](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[AccessKey]()
+ }
+ }
+
+ def getByAppid(appid: Int): Seq[AccessKey] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("appid", appid))
+ ESUtils.getAll[AccessKey](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[AccessKey]()
+ }
+ }
+
+ def update(accessKey: AccessKey): Unit = {
+ try {
+ client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+ def delete(key: String): Unit = {
+ try {
+ client.prepareDelete(index, estype, key).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
new file mode 100644
index 0000000..3781a4b
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+/** Elasticsearch implementation of Items. */
+class ESApps(client: Client, config: StorageClientConfig, index: String)
+ extends Apps with Logging {
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "apps"
+ private val seq = new ESSequences(client, config, index)
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(app: App): Option[Int] = {
+ val id =
+ if (app.id == 0) {
+ var roll = seq.genNext("apps")
+ while (!get(roll).isEmpty) roll = seq.genNext("apps")
+ roll
+ }
+ else app.id
+ val realapp = app.copy(id = id)
+ update(realapp)
+ Some(id)
+ }
+
+ def get(id: Int): Option[App] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ id.toString).get()
+ Some(read[App](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getByName(name: String): Option[App] = {
+ try {
+ val response = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("name", name)).get
+ val hits = response.getHits().hits()
+ if (hits.size > 0) {
+ Some(read[App](hits.head.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[App] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[App](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[App]()
+ }
+ }
+
+ def update(app: App): Unit = {
+ try {
+ val response = client.prepareIndex(index, estype, app.id.toString).
+ setSource(write(app)).get()
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+ def delete(id: Int): Unit = {
+ try {
+ client.prepareDelete(index, estype, id.toString).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
new file mode 100644
index 0000000..52697fd
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders.termFilter
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESChannels(client: Client, config: StorageClientConfig, index: String)
+ extends Channels with Logging {
+
+ implicit val formats = DefaultFormats.lossless
+ private val estype = "channels"
+ private val seq = new ESSequences(client, config, index)
+ private val seqName = "channels"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(channel: Channel): Option[Int] = {
+ val id =
+ if (channel.id == 0) {
+ var roll = seq.genNext(seqName)
+ while (!get(roll).isEmpty) roll = seq.genNext(seqName)
+ roll
+ } else channel.id
+
+ val realChannel = channel.copy(id = id)
+ if (update(realChannel)) Some(id) else None
+ }
+
+ def get(id: Int): Option[Channel] = {
+ try {
+ val response = client.prepareGet(
+ index,
+ estype,
+ id.toString).get()
+ Some(read[Channel](response.getSourceAsString))
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ case e: NullPointerException => None
+ }
+ }
+
+ def getByAppid(appid: Int): Seq[Channel] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).
+ setPostFilter(termFilter("appid", appid))
+ ESUtils.getAll[Channel](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq[Channel]()
+ }
+ }
+
+ def update(channel: Channel): Boolean = {
+ try {
+ val response = client.prepareIndex(index, estype, channel.id.toString).
+ setSource(write(channel)).get()
+ true
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ false
+ }
+ }
+
+ def delete(id: Int): Unit = {
+ try {
+ client.prepareDelete(index, estype, id.toString).get
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
new file mode 100644
index 0000000..21690bf
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstanceSerializer
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
+ extends EngineInstances with Logging {
+ implicit val formats = DefaultFormats + new EngineInstanceSerializer
+ private val estype = "engine_instances"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineVersion" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineVariant" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineFactory" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("batch" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("dataSourceParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("preparatorParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("algorithmsParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("servingParams" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(i: EngineInstance): String = {
+ try {
+ val response = client.prepareIndex(index, estype).
+ setSource(write(i)).get
+ response.getId
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ ""
+ }
+ }
+
+ def get(id: String): Option[EngineInstance] = {
+ try {
+ val response = client.prepareGet(index, estype, id).get
+ if (response.isExists) {
+ Some(read[EngineInstance](response.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[EngineInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[EngineInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def getCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Seq[EngineInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+ andFilter(
+ termFilter("status", "COMPLETED"),
+ termFilter("engineId", engineId),
+ termFilter("engineVersion", engineVersion),
+ termFilter("engineVariant", engineVariant))).
+ addSort("startTime", SortOrder.DESC)
+ ESUtils.getAll[EngineInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def getLatestCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Option[EngineInstance] =
+ getCompleted(
+ engineId,
+ engineVersion,
+ engineVariant).headOption
+
+ def update(i: EngineInstance): Unit = {
+ try {
+ client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+
+ def delete(id: String): Unit = {
+ try {
+ val response = client.prepareDelete(index, estype, id).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
new file mode 100644
index 0000000..85bf820
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
+ extends EvaluationInstances with Logging {
+ implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
+ private val estype = "evaluation_instances"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val json =
+ (estype ->
+ ("properties" ->
+ ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("evaluationClass" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("engineParamsGeneratorClass" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("batch" ->
+ ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+ ("evaluatorResults" ->
+ ("type" -> "string") ~ ("index" -> "no")) ~
+ ("evaluatorResultsHTML" ->
+ ("type" -> "string") ~ ("index" -> "no")) ~
+ ("evaluatorResultsJSON" ->
+ ("type" -> "string") ~ ("index" -> "no"))))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(json))).get
+ }
+
+ def insert(i: EvaluationInstance): String = {
+ try {
+ val response = client.prepareIndex(index, estype).
+ setSource(write(i)).get
+ response.getId
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ ""
+ }
+ }
+
+ def get(id: String): Option[EvaluationInstance] = {
+ try {
+ val response = client.prepareGet(index, estype, id).get
+ if (response.isExists) {
+ Some(read[EvaluationInstance](response.getSourceAsString))
+ } else {
+ None
+ }
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def getAll(): Seq[EvaluationInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype)
+ ESUtils.getAll[EvaluationInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def getCompleted(): Seq[EvaluationInstance] = {
+ try {
+ val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+ termFilter("status", "EVALCOMPLETED")).
+ addSort("startTime", SortOrder.DESC)
+ ESUtils.getAll[EvaluationInstance](client, builder)
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ Seq()
+ }
+ }
+
+ def update(i: EvaluationInstance): Unit = {
+ try {
+ client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+
+ def delete(id: String): Unit = {
+ try {
+ client.prepareDelete(index, estype, id).get
+ } catch {
+ case e: ElasticsearchException => error(e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
new file mode 100644
index 0000000..5c9e170
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging {
+ implicit val formats = DefaultFormats
+ private val estype = "sequences"
+
+ val indices = client.admin.indices
+ val indexExistResponse = indices.prepareExists(index).get
+ if (!indexExistResponse.isExists) {
+ // val settingsJson =
+ // ("number_of_shards" -> 1) ~
+ // ("auto_expand_replicas" -> "0-all")
+ indices.prepareCreate(index).get
+ }
+ val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+ if (!typeExistResponse.isExists) {
+ val mappingJson =
+ (estype ->
+ ("_source" -> ("enabled" -> 0)) ~
+ ("_all" -> ("enabled" -> 0)) ~
+ ("_type" -> ("index" -> "no")) ~
+ ("enabled" -> 0))
+ indices.preparePutMapping(index).setType(estype).
+ setSource(compact(render(mappingJson))).get
+ }
+
+ def genNext(name: String): Int = {
+ try {
+ val response = client.prepareIndex(index, estype, name).
+ setSource(compact(render("n" -> name))).get
+ response.getVersion().toInt
+ } catch {
+ case e: ElasticsearchException =>
+ error(e.getMessage)
+ 0
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
new file mode 100644
index 0000000..f5c99bf
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.elasticsearch.action.search.SearchRequestBuilder
+import org.elasticsearch.client.Client
+import org.elasticsearch.common.unit.TimeValue
+import org.json4s.Formats
+import org.json4s.native.Serialization.read
+
+import scala.collection.mutable.ArrayBuffer
+
+object ESUtils {
+ val scrollLife = new TimeValue(60000)
+
+ def getAll[T : Manifest](
+ client: Client,
+ builder: SearchRequestBuilder)(
+ implicit formats: Formats): Seq[T] = {
+ val results = ArrayBuffer[T]()
+ var response = builder.setScroll(scrollLife).get
+ var hits = response.getHits().hits()
+ results ++= hits.map(h => read[T](h.getSourceAsString))
+ while (hits.size > 0) {
+ response = client.prepareSearchScroll(response.getScrollId).
+ setScroll(scrollLife).get
+ hits = response.getHits().hits()
+ results ++= hits.map(h => read[T](h.getSourceAsString))
+ }
+ results
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
new file mode 100644
index 0000000..75ac2b0
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.transport.TransportClient
+import org.elasticsearch.common.settings.ImmutableSettings
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.transport.ConnectTransportException
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+ with Logging {
+ override val prefix = "ES"
+ val client = try {
+ val hosts = config.properties.get("HOSTS").
+ map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+ val ports = config.properties.get("PORTS").
+ map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
+ val settings = ImmutableSettings.settingsBuilder()
+ .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch"))
+ val transportClient = new TransportClient(settings)
+ (hosts zip ports) foreach { hp =>
+ transportClient.addTransportAddress(
+ new InetSocketTransportAddress(hp._1, hp._2))
+ }
+ transportClient
+ } catch {
+ case e: ConnectTransportException =>
+ throw new StorageClientException(e.getMessage, e)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
new file mode 100644
index 0000000..0c549b8
--- /dev/null
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** Elasticsearch implementation of storage traits, supporting meta data only
+ *
+ * @group Implementation
+ */
+package object elasticsearch {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/test/resources/application.conf b/storage/elasticsearch1/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/elasticsearch1/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+ sources {
+ mongodb {
+ type = mongodb
+ hosts = [localhost]
+ ports = [27017]
+ }
+ elasticsearch {
+ type = elasticsearch
+ hosts = [localhost]
+ ports = [9300]
+ }
+ }
+ repositories {
+ # This section is dummy just to make storage happy.
+ # The actual testing will not bypass these repository settings completely.
+ # Please refer to StorageTestUtils.scala.
+ settings {
+ name = "test_predictionio"
+ source = mongodb
+ }
+
+ appdata {
+ name = "test_predictionio_appdata"
+ source = mongodb
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/.gitignore
----------------------------------------------------------------------
diff --git a/storage/hbase/.gitignore b/storage/hbase/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/hbase/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/build.sbt
----------------------------------------------------------------------
diff --git a/storage/hbase/build.sbt b/storage/hbase/build.sbt
new file mode 100644
index 0000000..5856a5e
--- /dev/null
+++ b/storage/hbase/build.sbt
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-hbase"
+
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+ "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+ "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
+ "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2",
+ "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2"
+ exclude("org.apache.zookeeper", "zookeeper"),
+ // added for Parallel storage interface
+ "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2"
+ exclude("org.apache.hbase", "hbase-client")
+ exclude("org.apache.zookeeper", "zookeeper")
+ exclude("javax.servlet", "servlet-api")
+ exclude("org.mortbay.jetty", "servlet-api-2.5")
+ exclude("org.mortbay.jetty", "jsp-api-2.1")
+ exclude("org.mortbay.jetty", "jsp-2.1"),
+ "org.scalatest" %% "scalatest" % "2.1.7" % "test",
+ "org.specs2" %% "specs2" % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+ case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+ case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat
+ case x =>
+ val oldStrategy = (assemblyMergeStrategy in assembly).value
+ oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-hbase-assembly-" + version.value + ".jar")
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
new file mode 100644
index 0000000..2cdb734
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.filter.FilterList
+import org.apache.hadoop.hbase.filter.RegexStringComparator
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
+import org.apache.hadoop.hbase.filter.BinaryComparator
+import org.apache.hadoop.hbase.filter.QualifierFilter
+import org.apache.hadoop.hbase.filter.SkipFilter
+
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+import org.json4s.native.Serialization.{ read, write }
+
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+
+import org.apache.commons.codec.binary.Base64
+import java.security.MessageDigest
+
+import java.util.UUID
+
+/* common utility function for accessing EventsStore in HBase */
+object HBEventsUtil {
+
+ implicit val formats = DefaultFormats
+
+ def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = {
+ channelId.map { ch =>
+ s"${namespace}:events_${appId}_${ch}"
+ }.getOrElse {
+ s"${namespace}:events_${appId}"
+ }
+ }
+
+ // column names for "e" column family
+ val colNames: Map[String, Array[Byte]] = Map(
+ "event" -> "e",
+ "entityType" -> "ety",
+ "entityId" -> "eid",
+ "targetEntityType" -> "tety",
+ "targetEntityId" -> "teid",
+ "properties" -> "p",
+ "prId" -> "prid",
+ "eventTime" -> "et",
+ "eventTimeZone" -> "etz",
+ "creationTime" -> "ct",
+ "creationTimeZone" -> "ctz"
+ ).mapValues(Bytes.toBytes(_))
+
+ def hash(entityType: String, entityId: String): Array[Byte] = {
+ val s = entityType + "-" + entityId
+ // get a new MessageDigest object each time for thread-safe
+ val md5 = MessageDigest.getInstance("MD5")
+ md5.digest(Bytes.toBytes(s))
+ }
+
+ class RowKey(
+ val b: Array[Byte]
+ ) {
+ require((b.size == 32), s"Incorrect b size: ${b.size}")
+ lazy val entityHash: Array[Byte] = b.slice(0, 16)
+ lazy val millis: Long = Bytes.toLong(b.slice(16, 24))
+ lazy val uuidLow: Long = Bytes.toLong(b.slice(24, 32))
+
+ lazy val toBytes: Array[Byte] = b
+
+ override def toString: String = {
+ Base64.encodeBase64URLSafeString(toBytes)
+ }
+ }
+
+ object RowKey {
+ def apply(
+ entityType: String,
+ entityId: String,
+ millis: Long,
+ uuidLow: Long): RowKey = {
+ // add UUID least significant bits for multiple actions at the same time
+ // (UUID's most significant bits are actually timestamp,
+ // use eventTime instead).
+ val b = hash(entityType, entityId) ++
+ Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow)
+ new RowKey(b)
+ }
+
+ // get RowKey from string representation
+ def apply(s: String): RowKey = {
+ try {
+ apply(Base64.decodeBase64(s))
+ } catch {
+ case e: Exception => throw new RowKeyException(
+ s"Failed to convert String ${s} to RowKey because ${e}", e)
+ }
+ }
+
+ def apply(b: Array[Byte]): RowKey = {
+ if (b.size != 32) {
+ val bString = b.mkString(",")
+ throw new RowKeyException(
+ s"Incorrect byte array size. Bytes: ${bString}.")
+ }
+ new RowKey(b)
+ }
+
+ }
+
+ class RowKeyException(val msg: String, val cause: Exception)
+ extends Exception(msg, cause) {
+ def this(msg: String) = this(msg, null)
+ }
+
+ case class PartialRowKey(entityType: String, entityId: String,
+ millis: Option[Long] = None) {
+ val toBytes: Array[Byte] = {
+ hash(entityType, entityId) ++
+ (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]()))
+ }
+ }
+
+ def eventToPut(event: Event, appId: Int): (Put, RowKey) = {
+ // generate new rowKey if eventId is None
+ val rowKey = event.eventId.map { id =>
+ RowKey(id) // create rowKey from eventId
+ }.getOrElse {
+ // TOOD: use real UUID. not pseudo random
+ val uuidLow: Long = UUID.randomUUID().getLeastSignificantBits
+ RowKey(
+ entityType = event.entityType,
+ entityId = event.entityId,
+ millis = event.eventTime.getMillis,
+ uuidLow = uuidLow
+ )
+ }
+
+ val eBytes = Bytes.toBytes("e")
+ // use eventTime as HBase's cell timestamp
+ val put = new Put(rowKey.toBytes, event.eventTime.getMillis)
+
+ def addStringToE(col: Array[Byte], v: String): Put = {
+ put.add(eBytes, col, Bytes.toBytes(v))
+ }
+
+ def addLongToE(col: Array[Byte], v: Long): Put = {
+ put.add(eBytes, col, Bytes.toBytes(v))
+ }
+
+ addStringToE(colNames("event"), event.event)
+ addStringToE(colNames("entityType"), event.entityType)
+ addStringToE(colNames("entityId"), event.entityId)
+
+ event.targetEntityType.foreach { targetEntityType =>
+ addStringToE(colNames("targetEntityType"), targetEntityType)
+ }
+
+ event.targetEntityId.foreach { targetEntityId =>
+ addStringToE(colNames("targetEntityId"), targetEntityId)
+ }
+
+ // TODO: make properties Option[]
+ if (!event.properties.isEmpty) {
+ addStringToE(colNames("properties"), write(event.properties.toJObject))
+ }
+
+ event.prId.foreach { prId =>
+ addStringToE(colNames("prId"), prId)
+ }
+
+ addLongToE(colNames("eventTime"), event.eventTime.getMillis)
+ val eventTimeZone = event.eventTime.getZone
+ if (!eventTimeZone.equals(EventValidation.defaultTimeZone)) {
+ addStringToE(colNames("eventTimeZone"), eventTimeZone.getID)
+ }
+
+ addLongToE(colNames("creationTime"), event.creationTime.getMillis)
+ val creationTimeZone = event.creationTime.getZone
+ if (!creationTimeZone.equals(EventValidation.defaultTimeZone)) {
+ addStringToE(colNames("creationTimeZone"), creationTimeZone.getID)
+ }
+
+ // can use zero-length byte array for tag cell value
+ (put, rowKey)
+ }
+
+ def resultToEvent(result: Result, appId: Int): Event = {
+ val rowKey = RowKey(result.getRow())
+
+ val eBytes = Bytes.toBytes("e")
+ // val e = result.getFamilyMap(eBytes)
+
+ def getStringCol(col: String): String = {
+ val r = result.getValue(eBytes, colNames(col))
+ require(r != null,
+ s"Failed to get value for column ${col}. " +
+ s"Rowkey: ${rowKey.toString} " +
+ s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
+
+ Bytes.toString(r)
+ }
+
+ def getLongCol(col: String): Long = {
+ val r = result.getValue(eBytes, colNames(col))
+ require(r != null,
+ s"Failed to get value for column ${col}. " +
+ s"Rowkey: ${rowKey.toString} " +
+ s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
+
+ Bytes.toLong(r)
+ }
+
+ def getOptStringCol(col: String): Option[String] = {
+ val r = result.getValue(eBytes, colNames(col))
+ if (r == null) {
+ None
+ } else {
+ Some(Bytes.toString(r))
+ }
+ }
+
+ def getTimestamp(col: String): Long = {
+ result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp()
+ }
+
+ val event = getStringCol("event")
+ val entityType = getStringCol("entityType")
+ val entityId = getStringCol("entityId")
+ val targetEntityType = getOptStringCol("targetEntityType")
+ val targetEntityId = getOptStringCol("targetEntityId")
+ val properties: DataMap = getOptStringCol("properties")
+ .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
+ val prId = getOptStringCol("prId")
+ val eventTimeZone = getOptStringCol("eventTimeZone")
+ .map(DateTimeZone.forID(_))
+ .getOrElse(EventValidation.defaultTimeZone)
+ val eventTime = new DateTime(
+ getLongCol("eventTime"), eventTimeZone)
+ val creationTimeZone = getOptStringCol("creationTimeZone")
+ .map(DateTimeZone.forID(_))
+ .getOrElse(EventValidation.defaultTimeZone)
+ val creationTime: DateTime = new DateTime(
+ getLongCol("creationTime"), creationTimeZone)
+
+ Event(
+ eventId = Some(RowKey(result.getRow()).toString),
+ event = event,
+ entityType = entityType,
+ entityId = entityId,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ properties = properties,
+ eventTime = eventTime,
+ tags = Seq(),
+ prId = prId,
+ creationTime = creationTime
+ )
+ }
+
+
+ // for mandatory field. None means don't care.
+ // for optional field. None means don't care.
+ // Some(None) means not exist.
+ // Some(Some(x)) means it should match x
+ def createScan(
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ reversed: Option[Boolean] = None): Scan = {
+
+ val scan: Scan = new Scan()
+
+ (entityType, entityId) match {
+ case (Some(et), Some(eid)) => {
+ val start = PartialRowKey(et, eid,
+ startTime.map(_.getMillis)).toBytes
+ // if no untilTime, stop when reach next bytes of entityTypeAndId
+ val stop = PartialRowKey(et, eid,
+ untilTime.map(_.getMillis).orElse(Some(-1))).toBytes
+
+ if (reversed.getOrElse(false)) {
+ // Reversed order.
+ // If you specify a startRow and stopRow,
+ // to scan in reverse, the startRow needs to be lexicographically
+ // after the stopRow.
+ scan.setStartRow(stop)
+ scan.setStopRow(start)
+ scan.setReversed(true)
+ } else {
+ scan.setStartRow(start)
+ scan.setStopRow(stop)
+ }
+ }
+ case (_, _) => {
+ val minTime: Long = startTime.map(_.getMillis).getOrElse(0)
+ val maxTime: Long = untilTime.map(_.getMillis).getOrElse(Long.MaxValue)
+ scan.setTimeRange(minTime, maxTime)
+ if (reversed.getOrElse(false)) {
+ scan.setReversed(true)
+ }
+ }
+ }
+
+ val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL)
+
+ val eBytes = Bytes.toBytes("e")
+
+ def createBinaryFilter(col: String, value: Array[Byte]): SingleColumnValueFilter = {
+ val comp = new BinaryComparator(value)
+ new SingleColumnValueFilter(
+ eBytes, colNames(col), CompareOp.EQUAL, comp)
+ }
+
+ // skip the row if the column exists
+ def createSkipRowIfColumnExistFilter(col: String): SkipFilter = {
+ val comp = new BinaryComparator(colNames(col))
+ val q = new QualifierFilter(CompareOp.NOT_EQUAL, comp)
+ // filters an entire row if any of the Cell checks do not pass
+ new SkipFilter(q)
+ }
+
+ entityType.foreach { et =>
+ val compType = new BinaryComparator(Bytes.toBytes(et))
+ val filterType = new SingleColumnValueFilter(
+ eBytes, colNames("entityType"), CompareOp.EQUAL, compType)
+ filters.addFilter(filterType)
+ }
+
+ entityId.foreach { eid =>
+ val compId = new BinaryComparator(Bytes.toBytes(eid))
+ val filterId = new SingleColumnValueFilter(
+ eBytes, colNames("entityId"), CompareOp.EQUAL, compId)
+ filters.addFilter(filterId)
+ }
+
+ eventNames.foreach { eventsList =>
+ // match any of event in the eventsList
+ val eventFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE)
+ eventsList.foreach { e =>
+ val compEvent = new BinaryComparator(Bytes.toBytes(e))
+ val filterEvent = new SingleColumnValueFilter(
+ eBytes, colNames("event"), CompareOp.EQUAL, compEvent)
+ eventFilters.addFilter(filterEvent)
+ }
+ if (!eventFilters.getFilters().isEmpty) {
+ filters.addFilter(eventFilters)
+ }
+ }
+
+ targetEntityType.foreach { tetOpt =>
+ if (tetOpt.isEmpty) {
+ val filter = createSkipRowIfColumnExistFilter("targetEntityType")
+ filters.addFilter(filter)
+ } else {
+ tetOpt.foreach { tet =>
+ val filter = createBinaryFilter(
+ "targetEntityType", Bytes.toBytes(tet))
+ // the entire row will be skipped if the column is not found.
+ filter.setFilterIfMissing(true)
+ filters.addFilter(filter)
+ }
+ }
+ }
+
+ targetEntityId.foreach { teidOpt =>
+ if (teidOpt.isEmpty) {
+ val filter = createSkipRowIfColumnExistFilter("targetEntityId")
+ filters.addFilter(filter)
+ } else {
+ teidOpt.foreach { teid =>
+ val filter = createBinaryFilter(
+ "targetEntityId", Bytes.toBytes(teid))
+ // the entire row will be skipped if the column is not found.
+ filter.setFilterIfMissing(true)
+ filters.addFilter(filter)
+ }
+ }
+ }
+
+ if (!filters.getFilters().isEmpty) {
+ scan.setFilter(filters)
+ }
+
+ scan
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
new file mode 100644
index 0000000..360b007
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.hbase.HBEventsUtil.RowKey
+import org.apache.hadoop.hbase.HColumnDescriptor
+import org.apache.hadoop.hbase.HTableDescriptor
+import org.apache.hadoop.hbase.NamespaceDescriptor
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.joda.time.DateTime
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String)
+ extends LEvents with Logging {
+
+ // implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer
+
+ def resultToEvent(result: Result, appId: Int): Event =
+ HBEventsUtil.resultToEvent(result, appId)
+
+ def getTable(appId: Int, channelId: Option[Int] = None): HTableInterface =
+ client.connection.getTable(HBEventsUtil.tableName(namespace, appId, channelId))
+
+ override
+ def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+ // check namespace exist
+ val existingNamespace = client.admin.listNamespaceDescriptors()
+ .map(_.getName)
+ if (!existingNamespace.contains(namespace)) {
+ val nameDesc = NamespaceDescriptor.create(namespace).build()
+ info(s"The namespace ${namespace} doesn't exist yet. Creating now...")
+ client.admin.createNamespace(nameDesc)
+ }
+
+ val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
+ if (!client.admin.tableExists(tableName)) {
+ info(s"The table ${tableName.getNameAsString()} doesn't exist yet." +
+ " Creating now...")
+ val tableDesc = new HTableDescriptor(tableName)
+ tableDesc.addFamily(new HColumnDescriptor("e"))
+ tableDesc.addFamily(new HColumnDescriptor("r")) // reserved
+ client.admin.createTable(tableDesc)
+ }
+ true
+ }
+
+ override
+ def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
+ val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
+ try {
+ if (client.admin.tableExists(tableName)) {
+ info(s"Removing table ${tableName.getNameAsString()}...")
+ client.admin.disableTable(tableName)
+ client.admin.deleteTable(tableName)
+ } else {
+ info(s"Table ${tableName.getNameAsString()} doesn't exist." +
+ s" Nothing is deleted.")
+ }
+ true
+ } catch {
+ case e: Exception => {
+ error(s"Fail to remove table for appId ${appId}. Exception: ${e}")
+ false
+ }
+ }
+ }
+
+ override
+ def close(): Unit = {
+ client.admin.close()
+ client.connection.close()
+ }
+
+ override
+ def futureInsert(
+ event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+ Future[String] = {
+ Future {
+ val table = getTable(appId, channelId)
+ val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
+ table.put(put)
+ table.flushCommits()
+ table.close()
+ rowKey.toString
+ }
+ }
+
+ override
+ def futureGet(
+ eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+ Future[Option[Event]] = {
+ Future {
+ val table = getTable(appId, channelId)
+ val rowKey = RowKey(eventId)
+ val get = new Get(rowKey.toBytes)
+
+ val result = table.get(get)
+ table.close()
+
+ if (!result.isEmpty()) {
+ val event = resultToEvent(result, appId)
+ Some(event)
+ } else {
+ None
+ }
+ }
+ }
+
+ override
+ def futureDelete(
+ eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+ Future[Boolean] = {
+ Future {
+ val table = getTable(appId, channelId)
+ val rowKey = RowKey(eventId)
+ val exists = table.exists(new Get(rowKey.toBytes))
+ table.delete(new Delete(rowKey.toBytes))
+ table.close()
+ exists
+ }
+ }
+
+ override
+ def futureFind(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ limit: Option[Int] = None,
+ reversed: Option[Boolean] = None)(implicit ec: ExecutionContext):
+ Future[Iterator[Event]] = {
+ Future {
+
+ require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)),
+ "the parameter reversed can only be used with both entityType and entityId specified.")
+
+ val table = getTable(appId, channelId)
+
+ val scan = HBEventsUtil.createScan(
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ reversed = reversed)
+ val scanner = table.getScanner(scan)
+ table.close()
+
+ val eventsIter = scanner.iterator()
+
+ // Get all events if None or Some(-1)
+ val results: Iterator[Result] = limit match {
+ case Some(-1) => eventsIter
+ case None => eventsIter
+ case Some(x) => eventsIter.take(x)
+ }
+
+ val eventsIt = results.map { resultToEvent(_, appId) }
+
+ eventsIt
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
new file mode 100644
index 0000000..7324fa6
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.client.{Delete, HTable, Result}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce.OutputFormat
+import org.apache.predictionio.data.storage.{Event, PEvents, StorageClientConfig}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.joda.time.DateTime
+
+class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String) extends PEvents {
+
+ def checkTableExists(appId: Int, channelId: Option[Int]): Unit = {
+ if (!client.admin.tableExists(HBEventsUtil.tableName(namespace, appId, channelId))) {
+ if (channelId.nonEmpty) {
+ logger.error(s"The appId $appId with channelId $channelId does not exist." +
+ s" Please use valid appId and channelId.")
+ throw new Exception(s"HBase table not found for appId $appId" +
+ s" with channelId $channelId.")
+ } else {
+ logger.error(s"The appId $appId does not exist. Please use valid appId.")
+ throw new Exception(s"HBase table not found for appId $appId.")
+ }
+ }
+ }
+
+ override
+ def find(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None
+ )(sc: SparkContext): RDD[Event] = {
+
+ checkTableExists(appId, channelId)
+
+ val conf = HBaseConfiguration.create()
+ conf.set(TableInputFormat.INPUT_TABLE,
+ HBEventsUtil.tableName(namespace, appId, channelId))
+
+ val scan = HBEventsUtil.createScan(
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ reversed = None)
+ scan.setCaching(500) // TODO
+ scan.setCacheBlocks(false) // TODO
+
+ conf.set(TableInputFormat.SCAN, PIOHBaseUtil.convertScanToString(scan))
+
+ // HBase is not accessed until this rdd is actually used.
+ val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
+ classOf[ImmutableBytesWritable],
+ classOf[Result]).map {
+ case (key, row) => HBEventsUtil.resultToEvent(row, appId)
+ }
+
+ rdd
+ }
+
+ override
+ def write(
+ events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+
+ checkTableExists(appId, channelId)
+
+ val conf = HBaseConfiguration.create()
+ conf.set(TableOutputFormat.OUTPUT_TABLE,
+ HBEventsUtil.tableName(namespace, appId, channelId))
+ conf.setClass("mapreduce.outputformat.class",
+ classOf[TableOutputFormat[Object]],
+ classOf[OutputFormat[Object, Writable]])
+
+ events.map { event =>
+ val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
+ (new ImmutableBytesWritable(rowKey.toBytes), put)
+ }.saveAsNewAPIHadoopDataset(conf)
+
+ }
+
+ def delete(
+ eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+
+ checkTableExists(appId, channelId)
+
+ val tableName = HBEventsUtil.tableName(namespace, appId, channelId)
+
+ eventIds.foreachPartition{ iter =>
+ val conf = HBaseConfiguration.create()
+ conf.set(TableOutputFormat.OUTPUT_TABLE,
+ tableName)
+
+ val table = new HTable(conf, tableName)
+ iter.foreach { id =>
+ val rowKey = HBEventsUtil.RowKey(id)
+ val delete = new Delete(rowKey.b)
+ table.delete(delete)
+ }
+ table.close
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
new file mode 100644
index 0000000..745fcb9
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil
+import org.apache.hadoop.hbase.util.Base64
+
+object PIOHBaseUtil {
+ /*
+ * Copying this from Apache HBase because of its restrictive scope in 0.98.x
+ */
+ def convertScanToString(scan: Scan): String = {
+ val proto = ProtobufUtil.toScan(scan)
+ Base64.encodeBytes(proto.toByteArray)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
new file mode 100644
index 0000000..1720410
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.MasterNotRunningException
+import org.apache.hadoop.hbase.ZooKeeperConnectionException
+import org.apache.hadoop.hbase.client.HConnectionManager
+import org.apache.hadoop.hbase.client.HConnection
+import org.apache.hadoop.hbase.client.HBaseAdmin
+
+import grizzled.slf4j.Logging
+
+case class HBClient(
+ val conf: Configuration,
+ val connection: HConnection,
+ val admin: HBaseAdmin
+)
+
+class StorageClient(val config: StorageClientConfig)
+ extends BaseStorageClient with Logging {
+
+ val conf = HBaseConfiguration.create()
+
+ if (config.test) {
+ // use fewer retries and shorter timeout for test mode
+ conf.set("hbase.client.retries.number", "1")
+ conf.set("zookeeper.session.timeout", "30000");
+ conf.set("zookeeper.recovery.retry", "1")
+ }
+
+ try {
+ HBaseAdmin.checkHBaseAvailable(conf)
+ } catch {
+ case e: MasterNotRunningException =>
+ error("HBase master is not running (ZooKeeper ensemble: " +
+ conf.get("hbase.zookeeper.quorum") + "). Please make sure that HBase " +
+ "is running properly, and that the configuration is pointing at the " +
+ "correct ZooKeeper ensemble.")
+ throw e
+ case e: ZooKeeperConnectionException =>
+ error("Cannot connect to ZooKeeper (ZooKeeper ensemble: " +
+ conf.get("hbase.zookeeper.quorum") + "). Please make sure that the " +
+ "configuration is pointing at the correct ZooKeeper ensemble. By " +
+ "default, HBase manages its own ZooKeeper, so if you have not " +
+ "configured HBase to use an external ZooKeeper, that means your " +
+ "HBase is not started or configured properly.")
+ throw e
+ case e: Exception => {
+ error("Failed to connect to HBase." +
+ " Please check if HBase is running properly.")
+ throw e
+ }
+ }
+
+ val connection = HConnectionManager.createConnection(conf)
+
+ val client = HBClient(
+ conf = conf,
+ connection = connection,
+ admin = new HBaseAdmin(connection)
+ )
+
+ override
+ val prefix = "HB"
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
new file mode 100644
index 0000000..49bf031
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** HBase implementation of storage traits, supporting event data only
+ *
+ * @group Implementation
+ */
+package object hbase {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
new file mode 100644
index 0000000..cc07fa4
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase.upgrade
+
+import org.apache.predictionio.annotation.Experimental
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.client.HConnection
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.util.Bytes
+
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+import org.json4s.native.Serialization.{ read, write }
+
+import org.apache.commons.codec.binary.Base64
+
+import scala.collection.JavaConversions._
+
+/** :: Experimental :: */
+@Experimental
+object HB_0_8_0 {
+
+ implicit val formats = DefaultFormats
+
+ def getByAppId(
+ connection: HConnection,
+ namespace: String,
+ appId: Int): Iterator[Event] = {
+ val tableName = TableName.valueOf(namespace, "events")
+ val table = connection.getTable(tableName)
+ val start = PartialRowKey(appId)
+ val stop = PartialRowKey(appId + 1)
+ val scan = new Scan(start.toBytes, stop.toBytes)
+ val scanner = table.getScanner(scan)
+ table.close()
+ scanner.iterator().map { resultToEvent(_) }
+ }
+
+ val colNames: Map[String, Array[Byte]] = Map(
+ "event" -> "e",
+ "entityType" -> "ety",
+ "entityId" -> "eid",
+ "targetEntityType" -> "tety",
+ "targetEntityId" -> "teid",
+ "properties" -> "p",
+ "prId" -> "pk", // columna name is 'pk' in 0.8.0/0.8.1
+ "eventTimeZone" -> "etz",
+ "creationTimeZone" -> "ctz"
+ ).mapValues(Bytes.toBytes(_))
+
+
+ class RowKey(
+ val appId: Int,
+ val millis: Long,
+ val uuidLow: Long
+ ) {
+ lazy val toBytes: Array[Byte] = {
+ // add UUID least significant bits for multiple actions at the same time
+ // (UUID's most significant bits are actually timestamp,
+ // use eventTime instead).
+ Bytes.toBytes(appId) ++ Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow)
+ }
+ override def toString: String = {
+ Base64.encodeBase64URLSafeString(toBytes)
+ }
+ }
+
+ object RowKey {
+ // get RowKey from string representation
+ def apply(s: String): RowKey = {
+ try {
+ apply(Base64.decodeBase64(s))
+ } catch {
+ case e: Exception => throw new RowKeyException(
+ s"Failed to convert String ${s} to RowKey because ${e}", e)
+ }
+ }
+
+ def apply(b: Array[Byte]): RowKey = {
+ if (b.size != 20) {
+ val bString = b.mkString(",")
+ throw new RowKeyException(
+ s"Incorrect byte array size. Bytes: ${bString}.")
+ }
+
+ new RowKey(
+ appId = Bytes.toInt(b.slice(0, 4)),
+ millis = Bytes.toLong(b.slice(4, 12)),
+ uuidLow = Bytes.toLong(b.slice(12, 20))
+ )
+ }
+ }
+
+ class RowKeyException(msg: String, cause: Exception)
+ extends Exception(msg, cause) {
+ def this(msg: String) = this(msg, null)
+ }
+
+ case class PartialRowKey(val appId: Int, val millis: Option[Long] = None) {
+ val toBytes: Array[Byte] = {
+ Bytes.toBytes(appId) ++
+ (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]()))
+ }
+ }
+
+ def resultToEvent(result: Result): Event = {
+ val rowKey = RowKey(result.getRow())
+
+ val eBytes = Bytes.toBytes("e")
+ // val e = result.getFamilyMap(eBytes)
+
+ def getStringCol(col: String): String = {
+ val r = result.getValue(eBytes, colNames(col))
+ require(r != null,
+ s"Failed to get value for column ${col}. " +
+ s"Rowkey: ${rowKey.toString} " +
+ s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
+
+ Bytes.toString(r)
+ }
+
+ def getOptStringCol(col: String): Option[String] = {
+ val r = result.getValue(eBytes, colNames(col))
+ if (r == null) {
+ None
+ } else {
+ Some(Bytes.toString(r))
+ }
+ }
+
+ def getTimestamp(col: String): Long = {
+ result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp()
+ }
+
+ val event = getStringCol("event")
+ val entityType = getStringCol("entityType")
+ val entityId = getStringCol("entityId")
+ val targetEntityType = getOptStringCol("targetEntityType")
+ val targetEntityId = getOptStringCol("targetEntityId")
+ val properties: DataMap = getOptStringCol("properties")
+ .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
+ val prId = getOptStringCol("prId")
+ val eventTimeZone = getOptStringCol("eventTimeZone")
+ .map(DateTimeZone.forID(_))
+ .getOrElse(EventValidation.defaultTimeZone)
+ val creationTimeZone = getOptStringCol("creationTimeZone")
+ .map(DateTimeZone.forID(_))
+ .getOrElse(EventValidation.defaultTimeZone)
+
+ val creationTime: DateTime = new DateTime(
+ getTimestamp("event"), creationTimeZone
+ )
+
+ Event(
+ eventId = Some(RowKey(result.getRow()).toString),
+ event = event,
+ entityType = entityType,
+ entityId = entityId,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ properties = properties,
+ eventTime = new DateTime(rowKey.millis, eventTimeZone),
+ tags = Seq(),
+ prId = prId,
+ creationTime = creationTime
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala
new file mode 100644
index 0000000..1759561
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase.upgrade
+
+import org.apache.predictionio.annotation.Experimental
+
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.storage.hbase.HBLEvents
+import org.apache.predictionio.data.storage.hbase.HBEventsUtil
+
+import scala.collection.JavaConversions._
+
+/** :: Experimental :: */
+@Experimental
+object Upgrade {
+
+ def main(args: Array[String]) {
+ val fromAppId = args(0).toInt
+ val toAppId = args(1).toInt
+ val batchSize = args.lift(2).map(_.toInt).getOrElse(100)
+ val fromNamespace = args.lift(3).getOrElse("predictionio_eventdata")
+
+ upgrade(fromAppId, toAppId, batchSize, fromNamespace)
+ }
+
+ /* For upgrade from 0.8.0 or 0.8.1 to 0.8.2 only */
+ def upgrade(
+ fromAppId: Int,
+ toAppId: Int,
+ batchSize: Int,
+ fromNamespace: String) {
+
+ val events = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+ // Assume already run "pio app new <newapp>" (new app already created)
+ // TODO: check if new table empty and warn user if not
+ val newTable = events.getTable(toAppId)
+
+ val newTableName = newTable.getName().getNameAsString()
+ println(s"Copying data from ${fromNamespace}:events for app ID ${fromAppId}"
+ + s" to new HBase table ${newTableName}...")
+
+ HB_0_8_0.getByAppId(
+ events.client.connection,
+ fromNamespace,
+ fromAppId).grouped(batchSize).foreach { eventGroup =>
+ val puts = eventGroup.map{ e =>
+ val (put, rowkey) = HBEventsUtil.eventToPut(e, toAppId)
+ put
+ }
+ newTable.put(puts.toList)
+ }
+
+ newTable.flushCommits()
+ newTable.close()
+ println("Done.")
+ }
+
+}