You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:41 UTC

[10/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
new file mode 100644
index 0000000..cad72e2
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -0,0 +1,114 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders.termFilter
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESChannels(client: Client, config: StorageClientConfig, index: String)
+    extends Channels with Logging {
+
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "channels"
+  private val seq = new ESSequences(client, config, index)
+  private val seqName = "channels"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(channel: Channel): Option[Int] = {
+    val id =
+      if (channel.id == 0) {
+        var roll = seq.genNext(seqName)
+        while (!get(roll).isEmpty) roll = seq.genNext(seqName)
+        roll
+      } else channel.id
+
+    val realChannel = channel.copy(id = id)
+    if (update(realChannel)) Some(id) else None
+  }
+
+  def get(id: Int): Option[Channel] = {
+    try {
+      val response = client.prepareGet(
+        index,
+        estype,
+        id.toString).get()
+      Some(read[Channel](response.getSourceAsString))
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+      case e: NullPointerException => None
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[Channel] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("appid", appid))
+      ESUtils.getAll[Channel](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[Channel]()
+    }
+  }
+
+  def update(channel: Channel): Boolean = {
+    try {
+      val response = client.prepareIndex(index, estype, channel.id.toString).
+        setSource(write(channel)).get()
+      true
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        false
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    try {
+      client.prepareDelete(index, estype, id.toString).get
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
new file mode 100644
index 0000000..367e66f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -0,0 +1,155 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstanceSerializer
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
+  extends EngineInstances with Logging {
+  implicit val formats = DefaultFormats + new EngineInstanceSerializer
+  private val estype = "engine_instances"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineVersion" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineVariant" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineFactory" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("batch" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("dataSourceParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("preparatorParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("algorithmsParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("servingParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(i: EngineInstance): String = {
+    try {
+      val response = client.prepareIndex(index, estype).
+        setSource(write(i)).get
+      response.getId
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        ""
+    }
+  }
+
+  def get(id: String): Option[EngineInstance] = {
+    try {
+      val response = client.prepareGet(index, estype, id).get
+      if (response.isExists) {
+        Some(read[EngineInstance](response.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[EngineInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[EngineInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def getCompleted(
+      engineId: String,
+      engineVersion: String,
+      engineVariant: String): Seq[EngineInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+        andFilter(
+          termFilter("status", "COMPLETED"),
+          termFilter("engineId", engineId),
+          termFilter("engineVersion", engineVersion),
+          termFilter("engineVariant", engineVariant))).
+        addSort("startTime", SortOrder.DESC)
+      ESUtils.getAll[EngineInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def getLatestCompleted(
+      engineId: String,
+      engineVersion: String,
+      engineVariant: String): Option[EngineInstance] =
+    getCompleted(
+      engineId,
+      engineVersion,
+      engineVariant).headOption
+
+  def update(i: EngineInstance): Unit = {
+    try {
+      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+
+  def delete(id: String): Unit = {
+    try {
+      val response = client.prepareDelete(index, estype, id).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
new file mode 100644
index 0000000..f357d44
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
@@ -0,0 +1,81 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifestSerializer
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifests
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.json4s._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEngineManifests(client: Client, config: StorageClientConfig, index: String)
+  extends EngineManifests with Logging {
+  implicit val formats = DefaultFormats + new EngineManifestSerializer
+  private val estype = "engine_manifests"
+  private def esid(id: String, version: String) = s"$id $version"
+
+  def insert(engineManifest: EngineManifest): Unit = {
+    val json = write(engineManifest)
+    val response = client.prepareIndex(
+      index,
+      estype,
+      esid(engineManifest.id, engineManifest.version)).
+      setSource(json).execute().actionGet()
+  }
+
+  def get(id: String, version: String): Option[EngineManifest] = {
+    try {
+      val response = client.prepareGet(index, estype, esid(id, version)).
+        execute().actionGet()
+      if (response.isExists) {
+        Some(read[EngineManifest](response.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[EngineManifest] = {
+    try {
+      val builder = client.prepareSearch()
+      ESUtils.getAll[EngineManifest](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit =
+    insert(engineManifest)
+
+  def delete(id: String, version: String): Unit = {
+    try {
+      client.prepareDelete(index, estype, esid(id, version)).execute().actionGet()
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
new file mode 100644
index 0000000..c78378f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -0,0 +1,133 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
+  extends EvaluationInstances with Logging {
+  implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
+  private val estype = "evaluation_instances"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
+      (estype ->
+        ("properties" ->
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("evaluationClass" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineParamsGeneratorClass" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("batch" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("evaluatorResults" ->
+            ("type" -> "string") ~ ("index" -> "no")) ~
+          ("evaluatorResultsHTML" ->
+            ("type" -> "string") ~ ("index" -> "no")) ~
+          ("evaluatorResultsJSON" ->
+            ("type" -> "string") ~ ("index" -> "no"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
+  }
+
+  def insert(i: EvaluationInstance): String = {
+    try {
+      val response = client.prepareIndex(index, estype).
+        setSource(write(i)).get
+      response.getId
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        ""
+    }
+  }
+
+  def get(id: String): Option[EvaluationInstance] = {
+    try {
+      val response = client.prepareGet(index, estype, id).get
+      if (response.isExists) {
+        Some(read[EvaluationInstance](response.getSourceAsString))
+      } else {
+        None
+      }
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def getAll(): Seq[EvaluationInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[EvaluationInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def getCompleted(): Seq[EvaluationInstance] = {
+    try {
+      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+        termFilter("status", "EVALCOMPLETED")).
+        addSort("startTime", SortOrder.DESC)
+      ESUtils.getAll[EvaluationInstance](client, builder)
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
+    }
+  }
+
+  def update(i: EvaluationInstance): Unit = {
+    try {
+      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+
+  def delete(id: String): Unit = {
+    try {
+      client.prepareDelete(index, estype, id).get
+    } catch {
+      case e: ElasticsearchException => error(e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
new file mode 100644
index 0000000..78d43ac
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -0,0 +1,61 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging {
+  implicit val formats = DefaultFormats
+  private val estype = "sequences"
+
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    // val settingsJson =
+    //   ("number_of_shards" -> 1) ~
+    //   ("auto_expand_replicas" -> "0-all")
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val mappingJson =
+      (estype ->
+        ("_source" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> 0)) ~
+        ("_type" -> ("index" -> "no")) ~
+        ("enabled" -> 0))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(mappingJson))).get
+  }
+
+  def genNext(name: String): Int = {
+    try {
+      val response = client.prepareIndex(index, estype, name).
+        setSource(compact(render("n" -> name))).get
+      response.getVersion().toInt
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
new file mode 100644
index 0000000..8410458
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -0,0 +1,45 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.elasticsearch.action.search.SearchRequestBuilder
+import org.elasticsearch.client.Client
+import org.elasticsearch.common.unit.TimeValue
+import org.json4s.Formats
+import org.json4s.native.Serialization.read
+
+import scala.collection.mutable.ArrayBuffer
+
+object ESUtils {
+  val scrollLife = new TimeValue(60000)
+
+  def getAll[T : Manifest](
+      client: Client,
+      builder: SearchRequestBuilder)(
+      implicit formats: Formats): Seq[T] = {
+    val results = ArrayBuffer[T]()
+    var response = builder.setScroll(scrollLife).get
+    var hits = response.getHits().hits()
+    results ++= hits.map(h => read[T](h.getSourceAsString))
+    while (hits.size > 0) {
+      response = client.prepareSearchScroll(response.getScrollId).
+        setScroll(scrollLife).get
+      hits = response.getHits().hits()
+      results ++= hits.map(h => read[T](h.getSourceAsString))
+    }
+    results
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
new file mode 100644
index 0000000..8b57620
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -0,0 +1,47 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.transport.TransportClient
+import org.elasticsearch.common.settings.ImmutableSettings
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.transport.ConnectTransportException
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+    with Logging {
+  override val prefix = "ES"
+  val client = try {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
+    val settings = ImmutableSettings.settingsBuilder()
+      .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch"))
+    val transportClient = new TransportClient(settings)
+    (hosts zip ports) foreach { hp =>
+      transportClient.addTransportAddress(
+        new InetSocketTransportAddress(hp._1, hp._2))
+    }
+    transportClient
+  } catch {
+    case e: ConnectTransportException =>
+      throw new StorageClientException(e.getMessage, e)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
new file mode 100644
index 0000000..404bdda
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
@@ -0,0 +1,22 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+/** Elasticsearch implementation of storage traits, supporting meta data only
+  *
+  * @group Implementation
+  */
+package object elasticsearch {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
new file mode 100644
index 0000000..47f86a1
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
@@ -0,0 +1,412 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.filter.FilterList
+import org.apache.hadoop.hbase.filter.RegexStringComparator
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
+import org.apache.hadoop.hbase.filter.BinaryComparator
+import org.apache.hadoop.hbase.filter.QualifierFilter
+import org.apache.hadoop.hbase.filter.SkipFilter
+
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+import org.json4s.native.Serialization.{ read, write }
+
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+
+import org.apache.commons.codec.binary.Base64
+import java.security.MessageDigest
+
+import java.util.UUID
+
+/* common utility function for accessing EventsStore in HBase */
+object HBEventsUtil {
+
+  implicit val formats = DefaultFormats
+
+  def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = {
+    channelId.map { ch =>
+      s"${namespace}:events_${appId}_${ch}"
+    }.getOrElse {
+      s"${namespace}:events_${appId}"
+    }
+  }
+
+  // column names for "e" column family
+  val colNames: Map[String, Array[Byte]] = Map(
+    "event" -> "e",
+    "entityType" -> "ety",
+    "entityId" -> "eid",
+    "targetEntityType" -> "tety",
+    "targetEntityId" -> "teid",
+    "properties" -> "p",
+    "prId" -> "prid",
+    "eventTime" -> "et",
+    "eventTimeZone" -> "etz",
+    "creationTime" -> "ct",
+    "creationTimeZone" -> "ctz"
+  ).mapValues(Bytes.toBytes(_))
+
+  def hash(entityType: String, entityId: String): Array[Byte] = {
+    val s = entityType + "-" + entityId
+    // get a new MessageDigest object each time for thread-safe
+    val md5 = MessageDigest.getInstance("MD5")
+    md5.digest(Bytes.toBytes(s))
+  }
+
+  class RowKey(
+    val b: Array[Byte]
+  ) {
+    require((b.size == 32), s"Incorrect b size: ${b.size}")
+    lazy val entityHash: Array[Byte] = b.slice(0, 16)
+    lazy val millis: Long = Bytes.toLong(b.slice(16, 24))
+    lazy val uuidLow: Long = Bytes.toLong(b.slice(24, 32))
+
+    lazy val toBytes: Array[Byte] = b
+
+    override def toString: String = {
+      Base64.encodeBase64URLSafeString(toBytes)
+    }
+  }
+
+  object RowKey {
+    def apply(
+      entityType: String,
+      entityId: String,
+      millis: Long,
+      uuidLow: Long): RowKey = {
+        // add UUID least significant bits for multiple actions at the same time
+        // (UUID's most significant bits are actually timestamp,
+        // use eventTime instead).
+        val b = hash(entityType, entityId) ++
+          Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow)
+        new RowKey(b)
+      }
+
+    // get RowKey from string representation
+    def apply(s: String): RowKey = {
+      try {
+        apply(Base64.decodeBase64(s))
+      } catch {
+        case e: Exception => throw new RowKeyException(
+          s"Failed to convert String ${s} to RowKey because ${e}", e)
+      }
+    }
+
+    def apply(b: Array[Byte]): RowKey = {
+      if (b.size != 32) {
+        val bString = b.mkString(",")
+        throw new RowKeyException(
+          s"Incorrect byte array size. Bytes: ${bString}.")
+      }
+      new RowKey(b)
+    }
+
+  }
+
+  class RowKeyException(val msg: String, val cause: Exception)
+    extends Exception(msg, cause) {
+      def this(msg: String) = this(msg, null)
+    }
+
+  case class PartialRowKey(entityType: String, entityId: String,
+    millis: Option[Long] = None) {
+    val toBytes: Array[Byte] = {
+      hash(entityType, entityId) ++
+        (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]()))
+    }
+  }
+
+  def eventToPut(event: Event, appId: Int): (Put, RowKey) = {
+    // generate new rowKey if eventId is None
+    val rowKey = event.eventId.map { id =>
+      RowKey(id) // create rowKey from eventId
+    }.getOrElse {
+      // TOOD: use real UUID. not pseudo random
+      val uuidLow: Long = UUID.randomUUID().getLeastSignificantBits
+      RowKey(
+        entityType = event.entityType,
+        entityId = event.entityId,
+        millis = event.eventTime.getMillis,
+        uuidLow = uuidLow
+      )
+    }
+
+    val eBytes = Bytes.toBytes("e")
+    // use eventTime as HBase's cell timestamp
+    val put = new Put(rowKey.toBytes, event.eventTime.getMillis)
+
+    def addStringToE(col: Array[Byte], v: String): Put = {
+      put.add(eBytes, col, Bytes.toBytes(v))
+    }
+
+    def addLongToE(col: Array[Byte], v: Long): Put = {
+      put.add(eBytes, col, Bytes.toBytes(v))
+    }
+
+    addStringToE(colNames("event"), event.event)
+    addStringToE(colNames("entityType"), event.entityType)
+    addStringToE(colNames("entityId"), event.entityId)
+
+    event.targetEntityType.foreach { targetEntityType =>
+      addStringToE(colNames("targetEntityType"), targetEntityType)
+    }
+
+    event.targetEntityId.foreach { targetEntityId =>
+      addStringToE(colNames("targetEntityId"), targetEntityId)
+    }
+
+    // TODO: make properties Option[]
+    if (!event.properties.isEmpty) {
+      addStringToE(colNames("properties"), write(event.properties.toJObject))
+    }
+
+    event.prId.foreach { prId =>
+      addStringToE(colNames("prId"), prId)
+    }
+
+    addLongToE(colNames("eventTime"), event.eventTime.getMillis)
+    val eventTimeZone = event.eventTime.getZone
+    if (!eventTimeZone.equals(EventValidation.defaultTimeZone)) {
+      addStringToE(colNames("eventTimeZone"), eventTimeZone.getID)
+    }
+
+    addLongToE(colNames("creationTime"), event.creationTime.getMillis)
+    val creationTimeZone = event.creationTime.getZone
+    if (!creationTimeZone.equals(EventValidation.defaultTimeZone)) {
+      addStringToE(colNames("creationTimeZone"), creationTimeZone.getID)
+    }
+
+    // can use zero-length byte array for tag cell value
+    (put, rowKey)
+  }
+
+  def resultToEvent(result: Result, appId: Int): Event = {
+    val rowKey = RowKey(result.getRow())
+
+    val eBytes = Bytes.toBytes("e")
+    // val e = result.getFamilyMap(eBytes)
+
+    def getStringCol(col: String): String = {
+      val r = result.getValue(eBytes, colNames(col))
+      require(r != null,
+        s"Failed to get value for column ${col}. " +
+        s"Rowkey: ${rowKey.toString} " +
+        s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
+
+      Bytes.toString(r)
+    }
+
+    def getLongCol(col: String): Long = {
+      val r = result.getValue(eBytes, colNames(col))
+      require(r != null,
+        s"Failed to get value for column ${col}. " +
+        s"Rowkey: ${rowKey.toString} " +
+        s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
+
+      Bytes.toLong(r)
+    }
+
+    def getOptStringCol(col: String): Option[String] = {
+      val r = result.getValue(eBytes, colNames(col))
+      if (r == null) {
+        None
+      } else {
+        Some(Bytes.toString(r))
+      }
+    }
+
+    def getTimestamp(col: String): Long = {
+      result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp()
+    }
+
+    val event = getStringCol("event")
+    val entityType = getStringCol("entityType")
+    val entityId = getStringCol("entityId")
+    val targetEntityType = getOptStringCol("targetEntityType")
+    val targetEntityId = getOptStringCol("targetEntityId")
+    val properties: DataMap = getOptStringCol("properties")
+      .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
+    val prId = getOptStringCol("prId")
+    val eventTimeZone = getOptStringCol("eventTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val eventTime = new DateTime(
+      getLongCol("eventTime"), eventTimeZone)
+    val creationTimeZone = getOptStringCol("creationTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val creationTime: DateTime = new DateTime(
+      getLongCol("creationTime"), creationTimeZone)
+
+    Event(
+      eventId = Some(RowKey(result.getRow()).toString),
+      event = event,
+      entityType = entityType,
+      entityId = entityId,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      properties = properties,
+      eventTime = eventTime,
+      tags = Seq(),
+      prId = prId,
+      creationTime = creationTime
+    )
+  }
+
+
+  // for mandatory field. None means don't care.
+  // for optional field. None means don't care.
+  //    Some(None) means not exist.
+  //    Some(Some(x)) means it should match x
+  def createScan(
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    reversed: Option[Boolean] = None): Scan = {
+
+    val scan: Scan = new Scan()
+
+    (entityType, entityId) match {
+      case (Some(et), Some(eid)) => {
+        val start = PartialRowKey(et, eid,
+          startTime.map(_.getMillis)).toBytes
+        // if no untilTime, stop when reach next bytes of entityTypeAndId
+        val stop = PartialRowKey(et, eid,
+          untilTime.map(_.getMillis).orElse(Some(-1))).toBytes
+
+        if (reversed.getOrElse(false)) {
+          // Reversed order.
+          // If you specify a startRow and stopRow,
+          // to scan in reverse, the startRow needs to be lexicographically
+          // after the stopRow.
+          scan.setStartRow(stop)
+          scan.setStopRow(start)
+          scan.setReversed(true)
+        } else {
+          scan.setStartRow(start)
+          scan.setStopRow(stop)
+        }
+      }
+      case (_, _) => {
+        val minTime: Long = startTime.map(_.getMillis).getOrElse(0)
+        val maxTime: Long = untilTime.map(_.getMillis).getOrElse(Long.MaxValue)
+        scan.setTimeRange(minTime, maxTime)
+        if (reversed.getOrElse(false)) {
+          scan.setReversed(true)
+        }
+      }
+    }
+
+    val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL)
+
+    val eBytes = Bytes.toBytes("e")
+
+    def createBinaryFilter(col: String, value: Array[Byte]): SingleColumnValueFilter = {
+      val comp = new BinaryComparator(value)
+      new SingleColumnValueFilter(
+        eBytes, colNames(col), CompareOp.EQUAL, comp)
+    }
+
+    // skip the row if the column exists
+    def createSkipRowIfColumnExistFilter(col: String): SkipFilter = {
+      val comp = new BinaryComparator(colNames(col))
+      val q = new QualifierFilter(CompareOp.NOT_EQUAL, comp)
+      // filters an entire row if any of the Cell checks do not pass
+      new SkipFilter(q)
+    }
+
+    entityType.foreach { et =>
+      val compType = new BinaryComparator(Bytes.toBytes(et))
+      val filterType = new SingleColumnValueFilter(
+        eBytes, colNames("entityType"), CompareOp.EQUAL, compType)
+      filters.addFilter(filterType)
+    }
+
+    entityId.foreach { eid =>
+      val compId = new BinaryComparator(Bytes.toBytes(eid))
+      val filterId = new SingleColumnValueFilter(
+        eBytes, colNames("entityId"), CompareOp.EQUAL, compId)
+      filters.addFilter(filterId)
+    }
+
+    eventNames.foreach { eventsList =>
+      // match any of event in the eventsList
+      val eventFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE)
+      eventsList.foreach { e =>
+        val compEvent = new BinaryComparator(Bytes.toBytes(e))
+        val filterEvent = new SingleColumnValueFilter(
+          eBytes, colNames("event"), CompareOp.EQUAL, compEvent)
+        eventFilters.addFilter(filterEvent)
+      }
+      if (!eventFilters.getFilters().isEmpty) {
+        filters.addFilter(eventFilters)
+      }
+    }
+
+    targetEntityType.foreach { tetOpt =>
+      if (tetOpt.isEmpty) {
+        val filter = createSkipRowIfColumnExistFilter("targetEntityType")
+        filters.addFilter(filter)
+      } else {
+        tetOpt.foreach { tet =>
+          val filter = createBinaryFilter(
+            "targetEntityType", Bytes.toBytes(tet))
+          // the entire row will be skipped if the column is not found.
+          filter.setFilterIfMissing(true)
+          filters.addFilter(filter)
+        }
+      }
+    }
+
+    targetEntityId.foreach { teidOpt =>
+      if (teidOpt.isEmpty) {
+        val filter = createSkipRowIfColumnExistFilter("targetEntityId")
+        filters.addFilter(filter)
+      } else {
+        teidOpt.foreach { teid =>
+          val filter = createBinaryFilter(
+            "targetEntityId", Bytes.toBytes(teid))
+          // the entire row will be skipped if the column is not found.
+          filter.setFilterIfMissing(true)
+          filters.addFilter(filter)
+        }
+      }
+    }
+
+    if (!filters.getFilters().isEmpty) {
+      scan.setFilter(filters)
+    }
+
+    scan
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
new file mode 100644
index 0000000..7d7ed40
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala
@@ -0,0 +1,192 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.hbase
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.hbase.HBEventsUtil.RowKey
+import org.apache.hadoop.hbase.HColumnDescriptor
+import org.apache.hadoop.hbase.HTableDescriptor
+import org.apache.hadoop.hbase.NamespaceDescriptor
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.joda.time.DateTime
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String)
+  extends LEvents with Logging {
+
+  // implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer
+
+  def resultToEvent(result: Result, appId: Int): Event =
+    HBEventsUtil.resultToEvent(result, appId)
+
+  def getTable(appId: Int, channelId: Option[Int] = None): HTableInterface =
+    client.connection.getTable(HBEventsUtil.tableName(namespace, appId, channelId))
+
+  override
+  def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+    // check namespace exist
+    val existingNamespace = client.admin.listNamespaceDescriptors()
+      .map(_.getName)
+    if (!existingNamespace.contains(namespace)) {
+      val nameDesc = NamespaceDescriptor.create(namespace).build()
+      info(s"The namespace ${namespace} doesn't exist yet. Creating now...")
+      client.admin.createNamespace(nameDesc)
+    }
+
+    val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
+    if (!client.admin.tableExists(tableName)) {
+      info(s"The table ${tableName.getNameAsString()} doesn't exist yet." +
+        " Creating now...")
+      val tableDesc = new HTableDescriptor(tableName)
+      tableDesc.addFamily(new HColumnDescriptor("e"))
+      tableDesc.addFamily(new HColumnDescriptor("r")) // reserved
+      client.admin.createTable(tableDesc)
+    }
+    true
+  }
+
+  override
+  def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
+    val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
+    try {
+      if (client.admin.tableExists(tableName)) {
+        info(s"Removing table ${tableName.getNameAsString()}...")
+        client.admin.disableTable(tableName)
+        client.admin.deleteTable(tableName)
+      } else {
+        info(s"Table ${tableName.getNameAsString()} doesn't exist." +
+          s" Nothing is deleted.")
+      }
+      true
+    } catch {
+      case e: Exception => {
+        error(s"Fail to remove table for appId ${appId}. Exception: ${e}")
+        false
+      }
+    }
+  }
+
+  override
+  def close(): Unit = {
+    client.admin.close()
+    client.connection.close()
+  }
+
+  override
+  def futureInsert(
+    event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+    Future[String] = {
+    Future {
+      val table = getTable(appId, channelId)
+      val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
+      table.put(put)
+      table.flushCommits()
+      table.close()
+      rowKey.toString
+    }
+  }
+
+  override
+  def futureGet(
+    eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+    Future[Option[Event]] = {
+      Future {
+        val table = getTable(appId, channelId)
+        val rowKey = RowKey(eventId)
+        val get = new Get(rowKey.toBytes)
+
+        val result = table.get(get)
+        table.close()
+
+        if (!result.isEmpty()) {
+          val event = resultToEvent(result, appId)
+          Some(event)
+        } else {
+          None
+        }
+      }
+    }
+
+  override
+  def futureDelete(
+    eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
+    Future[Boolean] = {
+    Future {
+      val table = getTable(appId, channelId)
+      val rowKey = RowKey(eventId)
+      val exists = table.exists(new Get(rowKey.toBytes))
+      table.delete(new Delete(rowKey.toBytes))
+      table.close()
+      exists
+    }
+  }
+
+  override
+  def futureFind(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    limit: Option[Int] = None,
+    reversed: Option[Boolean] = None)(implicit ec: ExecutionContext):
+    Future[Iterator[Event]] = {
+      Future {
+
+        require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)),
+          "the parameter reversed can only be used with both entityType and entityId specified.")
+
+        val table = getTable(appId, channelId)
+
+        val scan = HBEventsUtil.createScan(
+          startTime = startTime,
+          untilTime = untilTime,
+          entityType = entityType,
+          entityId = entityId,
+          eventNames = eventNames,
+          targetEntityType = targetEntityType,
+          targetEntityId = targetEntityId,
+          reversed = reversed)
+        val scanner = table.getScanner(scan)
+        table.close()
+
+        val eventsIter = scanner.iterator()
+
+        // Get all events if None or Some(-1)
+        val results: Iterator[Result] = limit match {
+          case Some(-1) => eventsIter
+          case None => eventsIter
+          case Some(x) => eventsIter.take(x)
+        }
+
+        val eventsIt = results.map { resultToEvent(_, appId) }
+
+        eventsIt
+      }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
new file mode 100644
index 0000000..72254e0
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala
@@ -0,0 +1,112 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.mapreduce.PIOHBaseUtil
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce.OutputFormat
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.joda.time.DateTime
+
+class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String) extends PEvents {
+
+  def checkTableExists(appId: Int, channelId: Option[Int]): Unit = {
+    if (!client.admin.tableExists(HBEventsUtil.tableName(namespace, appId, channelId))) {
+      if (channelId.nonEmpty) {
+        logger.error(s"The appId $appId with channelId $channelId does not exist." +
+          s" Please use valid appId and channelId.")
+        throw new Exception(s"HBase table not found for appId $appId" +
+          s" with channelId $channelId.")
+      } else {
+        logger.error(s"The appId $appId does not exist. Please use valid appId.")
+        throw new Exception(s"HBase table not found for appId $appId.")
+      }
+    }
+  }
+
+  override
+  def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None
+    )(sc: SparkContext): RDD[Event] = {
+
+    checkTableExists(appId, channelId)
+
+    val conf = HBaseConfiguration.create()
+    conf.set(TableInputFormat.INPUT_TABLE,
+      HBEventsUtil.tableName(namespace, appId, channelId))
+
+    val scan = HBEventsUtil.createScan(
+        startTime = startTime,
+        untilTime = untilTime,
+        entityType = entityType,
+        entityId = entityId,
+        eventNames = eventNames,
+        targetEntityType = targetEntityType,
+        targetEntityId = targetEntityId,
+        reversed = None)
+    scan.setCaching(500) // TODO
+    scan.setCacheBlocks(false) // TODO
+
+    conf.set(TableInputFormat.SCAN, PIOHBaseUtil.convertScanToString(scan))
+
+    // HBase is not accessed until this rdd is actually used.
+    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
+      classOf[ImmutableBytesWritable],
+      classOf[Result]).map {
+        case (key, row) => HBEventsUtil.resultToEvent(row, appId)
+      }
+
+    rdd
+  }
+
+  override
+  def write(
+    events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+
+    checkTableExists(appId, channelId)
+
+    val conf = HBaseConfiguration.create()
+    conf.set(TableOutputFormat.OUTPUT_TABLE,
+      HBEventsUtil.tableName(namespace, appId, channelId))
+    conf.setClass("mapreduce.outputformat.class",
+      classOf[TableOutputFormat[Object]],
+      classOf[OutputFormat[Object, Writable]])
+
+    events.map { event =>
+      val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
+      (new ImmutableBytesWritable(rowKey.toBytes), put)
+    }.saveAsNewAPIHadoopDataset(conf)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
new file mode 100644
index 0000000..1027930
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala
@@ -0,0 +1,28 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.hadoop.hbase.mapreduce
+
+/* Pretends to be hbase.mapreduce package in order to expose its
+ * Package-accessible only static function convertScanToString()
+ */
+
+import org.apache.hadoop.hbase.client.Scan
+
+object PIOHBaseUtil {
+  def convertScanToString(scan: Scan): String = {
+    TableMapReduceUtil.convertScanToString(scan)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
new file mode 100644
index 0000000..f25b14a
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala
@@ -0,0 +1,83 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.MasterNotRunningException
+import org.apache.hadoop.hbase.ZooKeeperConnectionException
+import org.apache.hadoop.hbase.client.HConnectionManager
+import org.apache.hadoop.hbase.client.HConnection
+import org.apache.hadoop.hbase.client.HBaseAdmin
+
+import grizzled.slf4j.Logging
+
+case class HBClient(
+  val conf: Configuration,
+  val connection: HConnection,
+  val admin: HBaseAdmin
+)
+
+class StorageClient(val config: StorageClientConfig)
+  extends BaseStorageClient with Logging {
+
+  val conf = HBaseConfiguration.create()
+
+  if (config.test) {
+    // use fewer retries and shorter timeout for test mode
+    conf.set("hbase.client.retries.number", "1")
+    conf.set("zookeeper.session.timeout", "30000");
+    conf.set("zookeeper.recovery.retry", "1")
+  }
+
+  try {
+    HBaseAdmin.checkHBaseAvailable(conf)
+  } catch {
+    case e: MasterNotRunningException =>
+      error("HBase master is not running (ZooKeeper ensemble: " +
+        conf.get("hbase.zookeeper.quorum") + "). Please make sure that HBase " +
+        "is running properly, and that the configuration is pointing at the " +
+        "correct ZooKeeper ensemble.")
+      throw e
+    case e: ZooKeeperConnectionException =>
+      error("Cannot connect to ZooKeeper (ZooKeeper ensemble: " +
+        conf.get("hbase.zookeeper.quorum") + "). Please make sure that the " +
+        "configuration is pointing at the correct ZooKeeper ensemble. By " +
+        "default, HBase manages its own ZooKeeper, so if you have not " +
+        "configured HBase to use an external ZooKeeper, that means your " +
+        "HBase is not started or configured properly.")
+      throw e
+    case e: Exception => {
+      error("Failed to connect to HBase." +
+        " Please check if HBase is running properly.")
+      throw e
+    }
+  }
+
+  val connection = HConnectionManager.createConnection(conf)
+
+  val client = HBClient(
+    conf = conf,
+    connection = connection,
+    admin = new HBaseAdmin(connection)
+  )
+
+  override
+  val prefix = "HB"
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
new file mode 100644
index 0000000..2f8c170
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala
@@ -0,0 +1,22 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+/** HBase implementation of storage traits, supporting event data only
+  *
+  * @group Implementation
+  */
+package object hbase {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
new file mode 100644
index 0000000..9dcfb79
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
@@ -0,0 +1,190 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.hbase.upgrade
+
+import org.apache.predictionio.annotation.Experimental
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.client.HConnection
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.util.Bytes
+
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+import org.json4s.native.Serialization.{ read, write }
+
+import org.apache.commons.codec.binary.Base64
+
+import scala.collection.JavaConversions._
+
+/** :: Experimental :: */
+@Experimental
+object HB_0_8_0 {
+
+  implicit val formats = DefaultFormats
+
+  def getByAppId(
+    connection: HConnection,
+    namespace: String,
+    appId: Int): Iterator[Event] = {
+    val tableName = TableName.valueOf(namespace, "events")
+    val table = connection.getTable(tableName)
+    val start = PartialRowKey(appId)
+    val stop = PartialRowKey(appId + 1)
+    val scan = new Scan(start.toBytes, stop.toBytes)
+    val scanner = table.getScanner(scan)
+    table.close()
+    scanner.iterator().map { resultToEvent(_) }
+  }
+
+  val colNames: Map[String, Array[Byte]] = Map(
+    "event" -> "e",
+    "entityType" -> "ety",
+    "entityId" -> "eid",
+    "targetEntityType" -> "tety",
+    "targetEntityId" -> "teid",
+    "properties" -> "p",
+    "prId" -> "pk", // columna name is 'pk' in 0.8.0/0.8.1
+    "eventTimeZone" -> "etz",
+    "creationTimeZone" -> "ctz"
+  ).mapValues(Bytes.toBytes(_))
+
+
+  class RowKey(
+    val appId: Int,
+    val millis: Long,
+    val uuidLow: Long
+  ) {
+    lazy val toBytes: Array[Byte] = {
+      // add UUID least significant bits for multiple actions at the same time
+      // (UUID's most significant bits are actually timestamp,
+      // use eventTime instead).
+      Bytes.toBytes(appId) ++ Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow)
+    }
+    override def toString: String = {
+      Base64.encodeBase64URLSafeString(toBytes)
+    }
+  }
+
+  object RowKey {
+    // get RowKey from string representation
+    def apply(s: String): RowKey = {
+      try {
+        apply(Base64.decodeBase64(s))
+      } catch {
+        case e: Exception => throw new RowKeyException(
+          s"Failed to convert String ${s} to RowKey because ${e}", e)
+      }
+    }
+
+    def apply(b: Array[Byte]): RowKey = {
+      if (b.size != 20) {
+        val bString = b.mkString(",")
+        throw new RowKeyException(
+          s"Incorrect byte array size. Bytes: ${bString}.")
+      }
+
+      new RowKey(
+        appId = Bytes.toInt(b.slice(0, 4)),
+        millis = Bytes.toLong(b.slice(4, 12)),
+        uuidLow = Bytes.toLong(b.slice(12, 20))
+      )
+    }
+  }
+
+  class RowKeyException(msg: String, cause: Exception)
+    extends Exception(msg, cause) {
+      def this(msg: String) = this(msg, null)
+    }
+
+  case class PartialRowKey(val appId: Int, val millis: Option[Long] = None) {
+    val toBytes: Array[Byte] = {
+      Bytes.toBytes(appId) ++
+        (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]()))
+    }
+  }
+
+  def resultToEvent(result: Result): Event = {
+    val rowKey = RowKey(result.getRow())
+
+    val eBytes = Bytes.toBytes("e")
+    // val e = result.getFamilyMap(eBytes)
+
+    def getStringCol(col: String): String = {
+      val r = result.getValue(eBytes, colNames(col))
+      require(r != null,
+        s"Failed to get value for column ${col}. " +
+        s"Rowkey: ${rowKey.toString} " +
+        s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
+
+      Bytes.toString(r)
+    }
+
+    def getOptStringCol(col: String): Option[String] = {
+      val r = result.getValue(eBytes, colNames(col))
+      if (r == null) {
+        None
+      } else {
+        Some(Bytes.toString(r))
+      }
+    }
+
+    def getTimestamp(col: String): Long = {
+      result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp()
+    }
+
+    val event = getStringCol("event")
+    val entityType = getStringCol("entityType")
+    val entityId = getStringCol("entityId")
+    val targetEntityType = getOptStringCol("targetEntityType")
+    val targetEntityId = getOptStringCol("targetEntityId")
+    val properties: DataMap = getOptStringCol("properties")
+      .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
+    val prId = getOptStringCol("prId")
+    val eventTimeZone = getOptStringCol("eventTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val creationTimeZone = getOptStringCol("creationTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+
+    val creationTime: DateTime = new DateTime(
+      getTimestamp("event"), creationTimeZone
+    )
+
+    Event(
+      eventId = Some(RowKey(result.getRow()).toString),
+      event = event,
+      entityType = entityType,
+      entityId = entityId,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      properties = properties,
+      eventTime = new DateTime(rowKey.millis, eventTimeZone),
+      tags = Seq(),
+      prId = prId,
+      creationTime = creationTime
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala
new file mode 100644
index 0000000..7ef5305
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala
@@ -0,0 +1,72 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package io.prediction.data.storage.hbase.upgrade
+
+import io.prediction.annotation.Experimental
+
+import io.prediction.data.storage.Storage
+import io.prediction.data.storage.hbase.HBLEvents
+import io.prediction.data.storage.hbase.HBEventsUtil
+
+import scala.collection.JavaConversions._
+
+/** :: Experimental :: */
+@Experimental
+object Upgrade {
+
+  def main(args: Array[String]) {
+    val fromAppId = args(0).toInt
+    val toAppId = args(1).toInt
+    val batchSize = args.lift(2).map(_.toInt).getOrElse(100)
+    val fromNamespace = args.lift(3).getOrElse("predictionio_eventdata")
+
+    upgrade(fromAppId, toAppId, batchSize, fromNamespace)
+  }
+
+  /* For upgrade from 0.8.0 or 0.8.1 to 0.8.2 only */
+  def upgrade(
+    fromAppId: Int,
+    toAppId: Int,
+    batchSize: Int,
+    fromNamespace: String) {
+
+    val events = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+    // Assume already run "pio app new <newapp>" (new app already created)
+    // TODO: check if new table empty and warn user if not
+    val newTable = events.getTable(toAppId)
+
+    val newTableName = newTable.getName().getNameAsString()
+    println(s"Copying data from ${fromNamespace}:events for app ID ${fromAppId}"
+      + s" to new HBase table ${newTableName}...")
+
+    HB_0_8_0.getByAppId(
+      events.client.connection,
+      fromNamespace,
+      fromAppId).grouped(batchSize).foreach { eventGroup =>
+        val puts = eventGroup.map{ e =>
+          val (put, rowkey) = HBEventsUtil.eventToPut(e, toAppId)
+          put
+        }
+        newTable.put(puts.toList)
+      }
+
+    newTable.flushCommits()
+    newTable.close()
+    println("Done.")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
new file mode 100644
index 0000000..8b80b83
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
@@ -0,0 +1,221 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package io.prediction.data.storage.hbase.upgrade
+
+import io.prediction.annotation.Experimental
+
+import grizzled.slf4j.Logger
+import io.prediction.data.storage.Storage
+import io.prediction.data.storage.DataMap
+import io.prediction.data.storage.hbase.HBLEvents
+import io.prediction.data.storage.hbase.HBEventsUtil
+
+import scala.collection.JavaConversions._
+
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import io.prediction.data.storage.LEvents
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import java.lang.Thread
+
+object CheckDistribution {
+  def entityType(eventClient: LEvents, appId: Int)
+  : Map[(String, Option[String]), Int] = {
+    eventClient
+    .find(appId = appId)
+    .foldLeft(Map[(String, Option[String]), Int]().withDefaultValue(0)) {
+      case (m, e) => {
+        val k = (e.entityType, e.targetEntityType)
+        m.updated(k, m(k) + 1)
+      }
+    }
+  }
+
+  def runMain(appId: Int) {
+    val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+    entityType(eventClient, appId)
+    .toSeq
+    .sortBy(-_._2)
+    .foreach { println }
+
+  }
+
+  def main(args: Array[String]) {
+    runMain(args(0).toInt)
+  }
+
+}
+
+/** :: Experimental :: */
+@Experimental
+object Upgrade_0_8_3 {
+  val NameMap = Map(
+    "pio_user" -> "user",
+    "pio_item" -> "item")
+  val RevNameMap = NameMap.toSeq.map(_.swap).toMap
+
+  val logger = Logger[this.type]
+
+  def main(args: Array[String]) {
+    val fromAppId = args(0).toInt
+    val toAppId = args(1).toInt
+
+    runMain(fromAppId, toAppId)
+  }
+
+  def runMain(fromAppId: Int, toAppId: Int): Unit = {
+    upgrade(fromAppId, toAppId)
+  }
+
+
+  val obsEntityTypes = Set("pio_user", "pio_item")
+  val obsProperties = Set(
+    "pio_itypes", "pio_starttime", "pio_endtime",
+    "pio_inactive", "pio_price", "pio_rating")
+
+  def hasPIOPrefix(eventClient: LEvents, appId: Int): Boolean = {
+    eventClient.find(appId = appId).filter( e =>
+      (obsEntityTypes.contains(e.entityType) ||
+       e.targetEntityType.map(obsEntityTypes.contains(_)).getOrElse(false) ||
+       (!e.properties.keySet.forall(!obsProperties.contains(_)))
+      )
+    ).hasNext
+  }
+
+  def isEmpty(eventClient: LEvents, appId: Int): Boolean =
+    !eventClient.find(appId = appId).hasNext
+
+
+  def upgradeCopy(eventClient: LEvents, fromAppId: Int, toAppId: Int) {
+    val fromDist = CheckDistribution.entityType(eventClient, fromAppId)
+
+    logger.info("FromAppId Distribution")
+    fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+    val events = eventClient
+    .find(appId = fromAppId)
+    .zipWithIndex
+    .foreach { case (fromEvent, index) => {
+      if (index % 50000 == 0) {
+        // logger.info(s"Progress: $fromEvent $index")
+        logger.info(s"Progress: $index")
+      }
+
+
+      val fromEntityType = fromEvent.entityType
+      val toEntityType = NameMap.getOrElse(fromEntityType, fromEntityType)
+
+      val fromTargetEntityType = fromEvent.targetEntityType
+      val toTargetEntityType = fromTargetEntityType
+        .map { et => NameMap.getOrElse(et, et) }
+
+      val toProperties = DataMap(fromEvent.properties.fields.map {
+        case (k, v) =>
+          val newK = if (obsProperties.contains(k)) {
+            val nK = k.stripPrefix("pio_")
+            logger.info(s"property ${k} will be renamed to ${nK}")
+            nK
+          } else k
+          (newK, v)
+      })
+
+      val toEvent = fromEvent.copy(
+        entityType = toEntityType,
+        targetEntityType = toTargetEntityType,
+        properties = toProperties)
+
+      eventClient.insert(toEvent, toAppId)
+    }}
+
+
+    val toDist = CheckDistribution.entityType(eventClient, toAppId)
+
+    logger.info("Recap fromAppId Distribution")
+    fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+    logger.info("ToAppId Distribution")
+    toDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+    val fromGood = fromDist
+      .toSeq
+      .forall { case (k, c) => {
+        val (et, tet) = k
+        val net = NameMap.getOrElse(et, et)
+        val ntet = tet.map(tet => NameMap.getOrElse(tet, tet))
+        val nk = (net, ntet)
+        val nc = toDist.getOrElse(nk, -1)
+        val checkMatch = (c == nc)
+        if (!checkMatch) {
+          logger.info(s"${k} doesn't match: old has ${c}. new has ${nc}.")
+        }
+        checkMatch
+      }}
+
+    val toGood = toDist
+      .toSeq
+      .forall { case (k, c) => {
+        val (et, tet) = k
+        val oet = RevNameMap.getOrElse(et, et)
+        val otet = tet.map(tet => RevNameMap.getOrElse(tet, tet))
+        val ok = (oet, otet)
+        val oc = fromDist.getOrElse(ok, -1)
+        val checkMatch = (c == oc)
+        if (!checkMatch) {
+          logger.info(s"${k} doesn't match: new has ${c}. old has ${oc}.")
+        }
+        checkMatch
+      }}
+
+    if (!fromGood || !toGood) {
+      logger.error("Doesn't match!! There is an import error.")
+    } else {
+      logger.info("Count matches. Looks like we are good to go.")
+    }
+  }
+
+  /* For upgrade from 0.8.2 to 0.8.3 only */
+  def upgrade(fromAppId: Int, toAppId: Int) {
+
+    val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+    require(fromAppId != toAppId,
+      s"FromAppId: $fromAppId must be different from toAppId: $toAppId")
+
+    if (hasPIOPrefix(eventClient, fromAppId)) {
+      require(
+        isEmpty(eventClient, toAppId),
+        s"Target appId: $toAppId is not empty. Please run " +
+        "`pio app data-delete <app_name>` to clean the data before upgrading")
+
+      logger.info(s"$fromAppId isEmpty: " + isEmpty(eventClient, fromAppId))
+
+      upgradeCopy(eventClient, fromAppId, toAppId)
+
+    } else {
+      logger.info(s"From appId: ${fromAppId} doesn't contain"
+        + s" obsolete entityTypes ${obsEntityTypes} or"
+        + s" obsolete properties ${obsProperties}."
+        + " No need data migration."
+        + s" You can continue to use appId ${fromAppId}.")
+    }
+
+    logger.info("Done.")
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
new file mode 100644
index 0000000..ca967ae
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
@@ -0,0 +1,60 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.hdfs
+
+import java.io.IOException
+
+import com.google.common.io.ByteStreams
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+class HDFSModels(fs: FileSystem, config: StorageClientConfig, prefix: String)
+  extends Models with Logging {
+
+  def insert(i: Model): Unit = {
+    try {
+      val fsdos = fs.create(new Path(s"$prefix${i.id}"))
+      fsdos.write(i.models)
+      fsdos.close
+    } catch {
+      case e: IOException => error(e.getMessage)
+    }
+  }
+
+  def get(id: String): Option[Model] = {
+    try {
+      val p = new Path(s"$prefix$id")
+      Some(Model(
+        id = id,
+        models = ByteStreams.toByteArray(fs.open(p))))
+    } catch {
+      case e: Throwable =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val p = new Path(s"$prefix$id")
+    if (!fs.delete(p, false)) {
+      error(s"Unable to delete ${fs.makeQualified(p).toString}!")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
new file mode 100644
index 0000000..3382e12
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
@@ -0,0 +1,33 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.hdfs
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+    with Logging {
+  override val prefix = "HDFS"
+  val conf = new Configuration
+  val fs = FileSystem.get(conf)
+  fs.setWorkingDirectory(
+    new Path(config.properties.getOrElse("PATH", config.properties("HOSTS"))))
+  val client = fs
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
new file mode 100644
index 0000000..63b34b4
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
@@ -0,0 +1,22 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+/** HDFS implementation of storage traits, supporting model data only
+  *
+  * @group Implementation
+  */
+package object hdfs {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
new file mode 100644
index 0000000..588cc60
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
@@ -0,0 +1,84 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+import scala.util.Random
+
+/** JDBC implementation of [[AccessKeys]] */
+class JDBCAccessKeys(client: String, config: StorageClientConfig, prefix: String)
+  extends AccessKeys with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "accesskeys")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      accesskey varchar(64) not null primary key,
+      appid integer not null,
+      events text)""".execute().apply()
+  }
+
+  def insert(accessKey: AccessKey): Option[String] = DB localTx { implicit s =>
+    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+    val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(","))
+    sql"""
+    insert into $tableName values(
+      $key,
+      ${accessKey.appid},
+      $events)""".update().apply()
+    Some(key)
+  }
+
+  def get(key: String): Option[AccessKey] = DB readOnly { implicit session =>
+    sql"SELECT accesskey, appid, events FROM $tableName WHERE accesskey = $key".
+      map(resultToAccessKey).single().apply()
+  }
+
+  def getAll(): Seq[AccessKey] = DB readOnly { implicit session =>
+    sql"SELECT accesskey, appid, events FROM $tableName".map(resultToAccessKey).list().apply()
+  }
+
+  def getByAppid(appid: Int): Seq[AccessKey] = DB readOnly { implicit session =>
+    sql"SELECT accesskey, appid, events FROM $tableName WHERE appid = $appid".
+      map(resultToAccessKey).list().apply()
+  }
+
+  def update(accessKey: AccessKey): Unit = DB localTx { implicit session =>
+    val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(","))
+    sql"""
+    UPDATE $tableName SET
+      appid = ${accessKey.appid},
+      events = $events
+    WHERE accesskey = ${accessKey.key}""".update().apply()
+  }
+
+  def delete(key: String): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE accesskey = $key".update().apply()
+  }
+
+  /** Convert JDBC results to [[AccessKey]] */
+  def resultToAccessKey(rs: WrappedResultSet): AccessKey = {
+    AccessKey(
+      key = rs.string("accesskey"),
+      appid = rs.int("appid"),
+      events = rs.stringOpt("events").map(_.split(",").toSeq).getOrElse(Nil))
+  }
+}