You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:46 UTC

[15/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Storage.scala b/data/src/main/scala/io/prediction/data/storage/Storage.scala
deleted file mode 100644
index 3ad1400..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Storage.scala
+++ /dev/null
@@ -1,403 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import java.lang.reflect.InvocationTargetException
-
-import grizzled.slf4j.Logging
-import io.prediction.annotation.DeveloperApi
-
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.language.existentials
-import scala.reflect.runtime.universe._
-
-/** :: DeveloperApi ::
-  * Any storage backend drivers will need to implement this trait with exactly
-  * '''StorageClient''' as the class name. PredictionIO storage layer will look
-  * for this class when it instantiates the actual backend for use by higher
-  * level storage access APIs.
-  *
-  * @group Storage System
-  */
-@DeveloperApi
-trait BaseStorageClient {
-  /** Configuration of the '''StorageClient''' */
-  val config: StorageClientConfig
-
-  /** The actual client object. This could be a database connection or any kind
-    * of database access object.
-    */
-  val client: AnyRef
-
-  /** Set a prefix for storage class discovery. As an example, if this prefix
-    * is set as ''JDBC'', when the storage layer instantiates an implementation
-    * of [[Apps]], it will try to look for a class named ''JDBCApps''.
-    */
-  val prefix: String = ""
-}
-
-/** :: DeveloperApi ::
-  * A wrapper of storage client configuration that will be populated by
-  * PredictionIO automatically, and passed to the StorageClient during
-  * instantiation.
-  *
-  * @param parallel This is set to true by PredictionIO when the storage client
-  *                 is instantiated in a parallel data source.
-  * @param test This is set to true by PredictionIO when tests are being run.
-  * @param properties This is populated by PredictionIO automatically from
-  *                   environmental configuration variables. If you have these
-  *                   variables,
-  *                   - PIO_STORAGE_SOURCES_PGSQL_TYPE=jdbc
-  *                   - PIO_STORAGE_SOURCES_PGSQL_USERNAME=abc
-  *                   - PIO_STOARGE_SOURCES_PGSQL_PASSWORD=xyz
-  *
-  *                   this field will be filled as a map of string to string:
-  *                   - TYPE -> jdbc
-  *                   - USERNAME -> abc
-  *                   - PASSWORD -> xyz
-  *
-  * @group Storage System
-  */
-@DeveloperApi
-case class StorageClientConfig(
-  parallel: Boolean = false, // parallelized access (RDD)?
-  test: Boolean = false, // test mode config
-  properties: Map[String, String] = Map())
-
-/** :: DeveloperApi ::
-  * Thrown when a StorageClient runs into an exceptional condition
-  *
-  * @param message Exception error message
-  * @param cause The underlying exception that caused the exception
-  * @group Storage System
-  */
-@DeveloperApi
-class StorageClientException(message: String, cause: Throwable)
-  extends RuntimeException(message, cause)
-
-@deprecated("Use StorageException", "0.9.2")
-private[prediction] case class StorageError(message: String)
-
-/** :: DeveloperApi ::
-  * Thrown by data access objects when they run into exceptional conditions
-  *
-  * @param message Exception error message
-  * @param cause The underlying exception that caused the exception
-  *
-  * @group Storage System
-  */
-@DeveloperApi
-class StorageException(message: String, cause: Throwable)
-  extends Exception(message, cause) {
-
-  def this(message: String) = this(message, null)
-}
-
-/** Backend-agnostic data storage layer with lazy initialization. Use this
-  * object when you need to interface with Event Store in your engine.
-  *
-  * @group Storage System
-  */
-object Storage extends Logging {
-  private case class ClientMeta(
-    sourceType: String,
-    client: BaseStorageClient,
-    config: StorageClientConfig)
-
-  private case class DataObjectMeta(sourceName: String, namespace: String)
-
-  private var errors = 0
-
-  private val sourcesPrefix = "PIO_STORAGE_SOURCES"
-
-  private val sourceTypesRegex = """PIO_STORAGE_SOURCES_([^_]+)_TYPE""".r
-
-  private val sourceKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
-    sourceTypesRegex findFirstIn k match {
-      case Some(sourceTypesRegex(sourceType)) => Seq(sourceType)
-      case None => Nil
-    }
-  }
-
-  if (sourceKeys.size == 0) warn("There is no properly configured data source.")
-
-  private val s2cm = scala.collection.mutable.Map[String, Option[ClientMeta]]()
-
-  /** Reference to the app data repository. */
-  private val EventDataRepository = "EVENTDATA"
-  private val ModelDataRepository = "MODELDATA"
-  private val MetaDataRepository = "METADATA"
-
-  private val repositoriesPrefix = "PIO_STORAGE_REPOSITORIES"
-
-  private val repositoryNamesRegex =
-    """PIO_STORAGE_REPOSITORIES_([^_]+)_NAME""".r
-
-  private val repositoryKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
-    repositoryNamesRegex findFirstIn k match {
-      case Some(repositoryNamesRegex(repositoryName)) => Seq(repositoryName)
-      case None => Nil
-    }
-  }
-
-  if (repositoryKeys.size == 0) {
-    warn("There is no properly configured repository.")
-  }
-
-  private val requiredRepositories = Seq(MetaDataRepository)
-
-  requiredRepositories foreach { r =>
-    if (!repositoryKeys.contains(r)) {
-      error(s"Required repository (${r}) configuration is missing.")
-      errors += 1
-    }
-  }
-  private val repositoriesToDataObjectMeta: Map[String, DataObjectMeta] =
-    repositoryKeys.map(r =>
-      try {
-        val keyedPath = repositoriesPrefixPath(r)
-        val name = sys.env(prefixPath(keyedPath, "NAME"))
-        val sourceName = sys.env(prefixPath(keyedPath, "SOURCE"))
-        if (sourceKeys.contains(sourceName)) {
-          r -> DataObjectMeta(
-            sourceName = sourceName,
-            namespace = name)
-        } else {
-          error(s"$sourceName is not a configured storage source.")
-          r -> DataObjectMeta("", "")
-        }
-      } catch {
-        case e: Throwable =>
-          error(e.getMessage)
-          errors += 1
-          r -> DataObjectMeta("", "")
-      }
-    ).toMap
-
-  if (errors > 0) {
-    error(s"There were $errors configuration errors. Exiting.")
-    sys.exit(errors)
-  }
-
-  // End of constructor and field definitions and begin method definitions
-
-  private def prefixPath(prefix: String, body: String) = s"${prefix}_$body"
-
-  private def sourcesPrefixPath(body: String) = prefixPath(sourcesPrefix, body)
-
-  private def repositoriesPrefixPath(body: String) =
-    prefixPath(repositoriesPrefix, body)
-
-  private def sourcesToClientMeta(
-      source: String,
-      parallel: Boolean,
-      test: Boolean): Option[ClientMeta] = {
-    val sourceName = if (parallel) s"parallel-$source" else source
-    s2cm.getOrElseUpdate(sourceName, updateS2CM(source, parallel, test))
-  }
-
-  private def getClient(
-    clientConfig: StorageClientConfig,
-    pkg: String): BaseStorageClient = {
-    val className = "io.prediction.data.storage." + pkg + ".StorageClient"
-    try {
-      Class.forName(className).getConstructors()(0).newInstance(clientConfig).
-        asInstanceOf[BaseStorageClient]
-    } catch {
-      case e: ClassNotFoundException =>
-        val originalClassName = pkg + ".StorageClient"
-        Class.forName(originalClassName).getConstructors()(0).
-          newInstance(clientConfig).asInstanceOf[BaseStorageClient]
-      case e: java.lang.reflect.InvocationTargetException =>
-        throw e.getCause
-    }
-  }
-
-  /** Get the StorageClient config data from PIO Framework's environment variables */
-  def getConfig(sourceName: String): Option[StorageClientConfig] = {
-    if (s2cm.contains(sourceName) && s2cm.get(sourceName).nonEmpty
-      && s2cm.get(sourceName).get.nonEmpty) {
-      Some(s2cm.get(sourceName).get.get.config)
-    } else None
-  }
-
-  private def updateS2CM(k: String, parallel: Boolean, test: Boolean):
-  Option[ClientMeta] = {
-    try {
-      val keyedPath = sourcesPrefixPath(k)
-      val sourceType = sys.env(prefixPath(keyedPath, "TYPE"))
-      val props = sys.env.filter(t => t._1.startsWith(keyedPath)).map(
-        t => t._1.replace(s"${keyedPath}_", "") -> t._2)
-      val clientConfig = StorageClientConfig(
-        properties = props,
-        parallel = parallel,
-        test = test)
-      val client = getClient(clientConfig, sourceType)
-      Some(ClientMeta(sourceType, client, clientConfig))
-    } catch {
-      case e: Throwable =>
-        error(s"Error initializing storage client for source ${k}", e)
-        errors += 1
-        None
-    }
-  }
-
-  private[prediction]
-  def getDataObjectFromRepo[T](repo: String, test: Boolean = false)
-    (implicit tag: TypeTag[T]): T = {
-    val repoDOMeta = repositoriesToDataObjectMeta(repo)
-    val repoDOSourceName = repoDOMeta.sourceName
-    getDataObject[T](repoDOSourceName, repoDOMeta.namespace, test = test)
-  }
-
-  private[prediction]
-  def getPDataObject[T](repo: String)(implicit tag: TypeTag[T]): T = {
-    val repoDOMeta = repositoriesToDataObjectMeta(repo)
-    val repoDOSourceName = repoDOMeta.sourceName
-    getPDataObject[T](repoDOSourceName, repoDOMeta.namespace)
-  }
-
-  private[prediction] def getDataObject[T](
-      sourceName: String,
-      namespace: String,
-      parallel: Boolean = false,
-      test: Boolean = false)(implicit tag: TypeTag[T]): T = {
-    val clientMeta = sourcesToClientMeta(sourceName, parallel, test) getOrElse {
-      throw new StorageClientException(
-        s"Data source $sourceName was not properly initialized.", null)
-    }
-    val sourceType = clientMeta.sourceType
-    val ctorArgs = dataObjectCtorArgs(clientMeta.client, namespace)
-    val classPrefix = clientMeta.client.prefix
-    val originalClassName = tag.tpe.toString.split('.')
-    val rawClassName = sourceType + "." + classPrefix + originalClassName.last
-    val className = "io.prediction.data.storage." + rawClassName
-    val clazz = try {
-      Class.forName(className)
-    } catch {
-      case e: ClassNotFoundException =>
-        try {
-          Class.forName(rawClassName)
-        } catch {
-          case e: ClassNotFoundException =>
-            throw new StorageClientException("No storage backend " +
-              "implementation can be found (tried both " +
-              s"$className and $rawClassName)", e)
-        }
-    }
-    val constructor = clazz.getConstructors()(0)
-    try {
-      constructor.newInstance(ctorArgs: _*).
-        asInstanceOf[T]
-    } catch {
-      case e: IllegalArgumentException =>
-        error(
-          "Unable to instantiate data object with class '" +
-          constructor.getDeclaringClass.getName + " because its constructor" +
-          " does not have the right number of arguments." +
-          " Number of required constructor arguments: " +
-          ctorArgs.size + "." +
-          " Number of existing constructor arguments: " +
-          constructor.getParameterTypes.size + "." +
-          s" Storage source name: ${sourceName}." +
-          s" Exception message: ${e.getMessage}).", e)
-        errors += 1
-        throw e
-      case e: java.lang.reflect.InvocationTargetException =>
-        throw e.getCause
-    }
-  }
-
-  private def getPDataObject[T](
-      sourceName: String,
-      databaseName: String)(implicit tag: TypeTag[T]): T =
-    getDataObject[T](sourceName, databaseName, true)
-
-  private def dataObjectCtorArgs(
-      client: BaseStorageClient,
-      namespace: String): Seq[AnyRef] = {
-    Seq(client.client, client.config, namespace)
-  }
-
-  private[prediction] def verifyAllDataObjects(): Unit = {
-    info("Verifying Meta Data Backend (Source: " +
-      s"${repositoriesToDataObjectMeta(MetaDataRepository).sourceName})...")
-    getMetaDataEngineManifests()
-    getMetaDataEngineInstances()
-    getMetaDataEvaluationInstances()
-    getMetaDataApps()
-    getMetaDataAccessKeys()
-    info("Verifying Model Data Backend (Source: " +
-      s"${repositoriesToDataObjectMeta(ModelDataRepository).sourceName})...")
-    getModelDataModels()
-    info("Verifying Event Data Backend (Source: " +
-      s"${repositoriesToDataObjectMeta(EventDataRepository).sourceName})...")
-    val eventsDb = getLEvents(test = true)
-    info("Test writing to Event Store (App Id 0)...")
-    // use appId=0 for testing purpose
-    eventsDb.init(0)
-    eventsDb.insert(Event(
-      event = "test",
-      entityType = "test",
-      entityId = "test"), 0)
-    eventsDb.remove(0)
-    eventsDb.close()
-  }
-
-  private[prediction] def getMetaDataEngineManifests(): EngineManifests =
-    getDataObjectFromRepo[EngineManifests](MetaDataRepository)
-
-  private[prediction] def getMetaDataEngineInstances(): EngineInstances =
-    getDataObjectFromRepo[EngineInstances](MetaDataRepository)
-
-  private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances =
-    getDataObjectFromRepo[EvaluationInstances](MetaDataRepository)
-
-  private[prediction] def getMetaDataApps(): Apps =
-    getDataObjectFromRepo[Apps](MetaDataRepository)
-
-  private[prediction] def getMetaDataAccessKeys(): AccessKeys =
-    getDataObjectFromRepo[AccessKeys](MetaDataRepository)
-
-  private[prediction] def getMetaDataChannels(): Channels =
-    getDataObjectFromRepo[Channels](MetaDataRepository)
-
-  private[prediction] def getModelDataModels(): Models =
-    getDataObjectFromRepo[Models](ModelDataRepository)
-
-  /** Obtains a data access object that returns [[Event]] related local data
-    * structure.
-    */
-  def getLEvents(test: Boolean = false): LEvents =
-    getDataObjectFromRepo[LEvents](EventDataRepository, test = test)
-
-  /** Obtains a data access object that returns [[Event]] related RDD data
-    * structure.
-    */
-  def getPEvents(): PEvents =
-    getPDataObject[PEvents](EventDataRepository)
-
-  def config: Map[String, Map[String, Map[String, String]]] = Map(
-    "sources" -> s2cm.toMap.map { case (source, clientMeta) =>
-      source -> clientMeta.map { cm =>
-        Map(
-          "type" -> cm.sourceType,
-          "config" -> cm.config.properties.map(t => s"${t._1} -> ${t._2}").mkString(", ")
-        )
-      }.getOrElse(Map.empty)
-    }
-  )
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Utils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Utils.scala b/data/src/main/scala/io/prediction/data/storage/Utils.scala
deleted file mode 100644
index bafc5e6..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Utils.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.joda.time.DateTime
-import org.joda.time.format.ISODateTimeFormat
-
-/** Backend-agnostic storage utilities. */
-private[prediction] object Utils {
-  /**
-   * Add prefix to custom attribute keys.
-   */
-  def addPrefixToAttributeKeys[T](
-      attributes: Map[String, T],
-      prefix: String = "ca_"): Map[String, T] = {
-    attributes map { case (k, v) => (prefix + k, v) }
-  }
-
-  /** Remove prefix from custom attribute keys. */
-  def removePrefixFromAttributeKeys[T](
-      attributes: Map[String, T],
-      prefix: String = "ca_"): Map[String, T] = {
-    attributes map { case (k, v) => (k.stripPrefix(prefix), v) }
-  }
-
-  /**
-   * Appends App ID to any ID.
-   * Used for distinguishing different app's data within a single collection.
-   */
-  def idWithAppid(appid: Int, id: String): String = appid + "_" + id
-
-  def stringToDateTime(dt: String): DateTime =
-    ISODateTimeFormat.dateTimeParser.parseDateTime(dt)
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESAccessKeys.scala
deleted file mode 100644
index 7da7605..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESAccessKeys.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.StorageClientConfig
-import io.prediction.data.storage.AccessKey
-import io.prediction.data.storage.AccessKeys
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-import scala.util.Random
-
-/** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
-    extends AccessKeys with Logging {
-  implicit val formats = DefaultFormats.lossless
-  private val estype = "accesskeys"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(accessKey: AccessKey): Option[String] = {
-    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
-    update(accessKey.copy(key = key))
-    Some(key)
-  }
-
-  def get(key: String): Option[AccessKey] = {
-    try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        key).get()
-      Some(read[AccessKey](response.getSourceAsString))
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-      case e: NullPointerException => None
-    }
-  }
-
-  def getAll(): Seq[AccessKey] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[AccessKey](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[AccessKey]()
-    }
-  }
-
-  def getByAppid(appid: Int): Seq[AccessKey] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("appid", appid))
-      ESUtils.getAll[AccessKey](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[AccessKey]()
-    }
-  }
-
-  def update(accessKey: AccessKey): Unit = {
-    try {
-      client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get()
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-
-  def delete(key: String): Unit = {
-    try {
-      client.prepareDelete(index, estype, key).get
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESApps.scala
deleted file mode 100644
index 9ea821e..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESApps.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.StorageClientConfig
-import io.prediction.data.storage.App
-import io.prediction.data.storage.Apps
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-/** Elasticsearch implementation of Items. */
-class ESApps(client: Client, config: StorageClientConfig, index: String)
-  extends Apps with Logging {
-  implicit val formats = DefaultFormats.lossless
-  private val estype = "apps"
-  private val seq = new ESSequences(client, config, index)
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(app: App): Option[Int] = {
-    val id =
-      if (app.id == 0) {
-        var roll = seq.genNext("apps")
-        while (!get(roll).isEmpty) roll = seq.genNext("apps")
-        roll
-      }
-      else app.id
-    val realapp = app.copy(id = id)
-    update(realapp)
-    Some(id)
-  }
-
-  def get(id: Int): Option[App] = {
-    try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        id.toString).get()
-      Some(read[App](response.getSourceAsString))
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-      case e: NullPointerException => None
-    }
-  }
-
-  def getByName(name: String): Option[App] = {
-    try {
-      val response = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("name", name)).get
-      val hits = response.getHits().hits()
-      if (hits.size > 0) {
-        Some(read[App](hits.head.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[App] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[App](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[App]()
-    }
-  }
-
-  def update(app: App): Unit = {
-    try {
-      val response = client.prepareIndex(index, estype, app.id.toString).
-        setSource(write(app)).get()
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-
-  def delete(id: Int): Unit = {
-    try {
-      client.prepareDelete(index, estype, id.toString).get
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESChannels.scala
deleted file mode 100644
index ee5e9e7..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESChannels.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.Channel
-import io.prediction.data.storage.Channels
-import io.prediction.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders.termFilter
-import org.json4s.DefaultFormats
-import org.json4s.JsonDSL._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESChannels(client: Client, config: StorageClientConfig, index: String)
-    extends Channels with Logging {
-
-  implicit val formats = DefaultFormats.lossless
-  private val estype = "channels"
-  private val seq = new ESSequences(client, config, index)
-  private val seqName = "channels"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(channel: Channel): Option[Int] = {
-    val id =
-      if (channel.id == 0) {
-        var roll = seq.genNext(seqName)
-        while (!get(roll).isEmpty) roll = seq.genNext(seqName)
-        roll
-      } else channel.id
-
-    val realChannel = channel.copy(id = id)
-    if (update(realChannel)) Some(id) else None
-  }
-
-  def get(id: Int): Option[Channel] = {
-    try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        id.toString).get()
-      Some(read[Channel](response.getSourceAsString))
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-      case e: NullPointerException => None
-    }
-  }
-
-  def getByAppid(appid: Int): Seq[Channel] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("appid", appid))
-      ESUtils.getAll[Channel](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[Channel]()
-    }
-  }
-
-  def update(channel: Channel): Boolean = {
-    try {
-      val response = client.prepareIndex(index, estype, channel.id.toString).
-        setSource(write(channel)).get()
-      true
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        false
-    }
-  }
-
-  def delete(id: Int): Unit = {
-    try {
-      client.prepareDelete(index, estype, id.toString).get
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineInstances.scala
deleted file mode 100644
index d9b0c39..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineInstances.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.EngineInstance
-import io.prediction.data.storage.EngineInstanceSerializer
-import io.prediction.data.storage.EngineInstances
-import io.prediction.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEngineInstances(client: Client, config: StorageClientConfig, index: String)
-  extends EngineInstances with Logging {
-  implicit val formats = DefaultFormats + new EngineInstanceSerializer
-  private val estype = "engine_instances"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineVersion" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineVariant" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineFactory" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("batch" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("dataSourceParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("preparatorParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("algorithmsParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("servingParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(i: EngineInstance): String = {
-    try {
-      val response = client.prepareIndex(index, estype).
-        setSource(write(i)).get
-      response.getId
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        ""
-    }
-  }
-
-  def get(id: String): Option[EngineInstance] = {
-    try {
-      val response = client.prepareGet(index, estype, id).get
-      if (response.isExists) {
-        Some(read[EngineInstance](response.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[EngineInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[EngineInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def getCompleted(
-      engineId: String,
-      engineVersion: String,
-      engineVariant: String): Seq[EngineInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
-        andFilter(
-          termFilter("status", "COMPLETED"),
-          termFilter("engineId", engineId),
-          termFilter("engineVersion", engineVersion),
-          termFilter("engineVariant", engineVariant))).
-        addSort("startTime", SortOrder.DESC)
-      ESUtils.getAll[EngineInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def getLatestCompleted(
-      engineId: String,
-      engineVersion: String,
-      engineVariant: String): Option[EngineInstance] =
-    getCompleted(
-      engineId,
-      engineVersion,
-      engineVariant).headOption
-
-  def update(i: EngineInstance): Unit = {
-    try {
-      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-
-  def delete(id: String): Unit = {
-    try {
-      val response = client.prepareDelete(index, estype, id).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineManifests.scala
deleted file mode 100644
index a5333b5..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEngineManifests.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.EngineManifestSerializer
-import io.prediction.data.storage.StorageClientConfig
-import io.prediction.data.storage.EngineManifest
-import io.prediction.data.storage.EngineManifests
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.json4s._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEngineManifests(client: Client, config: StorageClientConfig, index: String)
-  extends EngineManifests with Logging {
-  implicit val formats = DefaultFormats + new EngineManifestSerializer
-  private val estype = "engine_manifests"
-  private def esid(id: String, version: String) = s"$id $version"
-
-  def insert(engineManifest: EngineManifest): Unit = {
-    val json = write(engineManifest)
-    val response = client.prepareIndex(
-      index,
-      estype,
-      esid(engineManifest.id, engineManifest.version)).
-      setSource(json).execute().actionGet()
-  }
-
-  def get(id: String, version: String): Option[EngineManifest] = {
-    try {
-      val response = client.prepareGet(index, estype, esid(id, version)).
-        execute().actionGet()
-      if (response.isExists) {
-        Some(read[EngineManifest](response.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[EngineManifest] = {
-    try {
-      val builder = client.prepareSearch()
-      ESUtils.getAll[EngineManifest](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit =
-    insert(engineManifest)
-
-  def delete(id: String, version: String): Unit = {
-    try {
-      client.prepareDelete(index, estype, esid(id, version)).execute().actionGet()
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEvaluationInstances.scala
deleted file mode 100644
index ae33417..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.EvaluationInstance
-import io.prediction.data.storage.EvaluationInstanceSerializer
-import io.prediction.data.storage.EvaluationInstances
-import io.prediction.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String)
-  extends EvaluationInstances with Logging {
-  implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
-  private val estype = "evaluation_instances"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("evaluationClass" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineParamsGeneratorClass" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("batch" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("evaluatorResults" ->
-            ("type" -> "string") ~ ("index" -> "no")) ~
-          ("evaluatorResultsHTML" ->
-            ("type" -> "string") ~ ("index" -> "no")) ~
-          ("evaluatorResultsJSON" ->
-            ("type" -> "string") ~ ("index" -> "no"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(i: EvaluationInstance): String = {
-    try {
-      val response = client.prepareIndex(index, estype).
-        setSource(write(i)).get
-      response.getId
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        ""
-    }
-  }
-
-  def get(id: String): Option[EvaluationInstance] = {
-    try {
-      val response = client.prepareGet(index, estype, id).get
-      if (response.isExists) {
-        Some(read[EvaluationInstance](response.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[EvaluationInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[EvaluationInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def getCompleted(): Seq[EvaluationInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
-        termFilter("status", "EVALCOMPLETED")).
-        addSort("startTime", SortOrder.DESC)
-      ESUtils.getAll[EvaluationInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def update(i: EvaluationInstance): Unit = {
-    try {
-      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-
-  def delete(id: String): Unit = {
-    try {
-      client.prepareDelete(index, estype, id).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESSequences.scala
deleted file mode 100644
index 99ab253..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESSequences.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-
-class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging {
-  implicit val formats = DefaultFormats
-  private val estype = "sequences"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    // val settingsJson =
-    //   ("number_of_shards" -> 1) ~
-    //   ("auto_expand_replicas" -> "0-all")
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val mappingJson =
-      (estype ->
-        ("_source" -> ("enabled" -> 0)) ~
-        ("_all" -> ("enabled" -> 0)) ~
-        ("_type" -> ("index" -> "no")) ~
-        ("enabled" -> 0))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(mappingJson))).get
-  }
-
-  def genNext(name: String): Int = {
-    try {
-      val response = client.prepareIndex(index, estype, name).
-        setSource(compact(render("n" -> name))).get
-      response.getVersion().toInt
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        0
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESUtils.scala
deleted file mode 100644
index 7cf693c..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/ESUtils.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import org.elasticsearch.action.search.SearchRequestBuilder
-import org.elasticsearch.client.Client
-import org.elasticsearch.common.unit.TimeValue
-import org.json4s.Formats
-import org.json4s.native.Serialization.read
-
-import scala.collection.mutable.ArrayBuffer
-
-object ESUtils {
-  val scrollLife = new TimeValue(60000)
-
-  def getAll[T : Manifest](
-      client: Client,
-      builder: SearchRequestBuilder)(
-      implicit formats: Formats): Seq[T] = {
-    val results = ArrayBuffer[T]()
-    var response = builder.setScroll(scrollLife).get
-    var hits = response.getHits().hits()
-    results ++= hits.map(h => read[T](h.getSourceAsString))
-    while (hits.size > 0) {
-      response = client.prepareSearchScroll(response.getScrollId).
-        setScroll(scrollLife).get
-      hits = response.getHits().hits()
-      results ++= hits.map(h => read[T](h.getSourceAsString))
-    }
-    results
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/StorageClient.scala
deleted file mode 100644
index 8f550c2..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/StorageClient.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.BaseStorageClient
-import io.prediction.data.storage.StorageClientConfig
-import io.prediction.data.storage.StorageClientException
-import org.elasticsearch.client.transport.TransportClient
-import org.elasticsearch.common.settings.ImmutableSettings
-import org.elasticsearch.common.transport.InetSocketTransportAddress
-import org.elasticsearch.transport.ConnectTransportException
-
-class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
-    with Logging {
-  override val prefix = "ES"
-  val client = try {
-    val hosts = config.properties.get("HOSTS").
-      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
-    val ports = config.properties.get("PORTS").
-      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
-    val settings = ImmutableSettings.settingsBuilder()
-      .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch"))
-    val transportClient = new TransportClient(settings)
-    (hosts zip ports) foreach { hp =>
-      transportClient.addTransportAddress(
-        new InetSocketTransportAddress(hp._1, hp._2))
-    }
-    transportClient
-  } catch {
-    case e: ConnectTransportException =>
-      throw new StorageClientException(e.getMessage, e)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/elasticsearch/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/elasticsearch/package.scala b/data/src/main/scala/io/prediction/data/storage/elasticsearch/package.scala
deleted file mode 100644
index daa3bc3..0000000
--- a/data/src/main/scala/io/prediction/data/storage/elasticsearch/package.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-/** Elasticsearch implementation of storage traits, supporting meta data only
-  *
-  * @group Implementation
-  */
-package object elasticsearch {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/hbase/HBEventsUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/hbase/HBEventsUtil.scala b/data/src/main/scala/io/prediction/data/storage/hbase/HBEventsUtil.scala
deleted file mode 100644
index 294961f..0000000
--- a/data/src/main/scala/io/prediction/data/storage/hbase/HBEventsUtil.scala
+++ /dev/null
@@ -1,412 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.hbase
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.storage.DataMap
-
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.filter.FilterList
-import org.apache.hadoop.hbase.filter.RegexStringComparator
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
-import org.apache.hadoop.hbase.filter.BinaryComparator
-import org.apache.hadoop.hbase.filter.QualifierFilter
-import org.apache.hadoop.hbase.filter.SkipFilter
-
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-import org.json4s.native.Serialization.{ read, write }
-
-import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
-
-import org.apache.commons.codec.binary.Base64
-import java.security.MessageDigest
-
-import java.util.UUID
-
-/* common utility function for accessing EventsStore in HBase */
-object HBEventsUtil {
-
-  implicit val formats = DefaultFormats
-
-  def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = {
-    channelId.map { ch =>
-      s"${namespace}:events_${appId}_${ch}"
-    }.getOrElse {
-      s"${namespace}:events_${appId}"
-    }
-  }
-
-  // column names for "e" column family
-  val colNames: Map[String, Array[Byte]] = Map(
-    "event" -> "e",
-    "entityType" -> "ety",
-    "entityId" -> "eid",
-    "targetEntityType" -> "tety",
-    "targetEntityId" -> "teid",
-    "properties" -> "p",
-    "prId" -> "prid",
-    "eventTime" -> "et",
-    "eventTimeZone" -> "etz",
-    "creationTime" -> "ct",
-    "creationTimeZone" -> "ctz"
-  ).mapValues(Bytes.toBytes(_))
-
-  def hash(entityType: String, entityId: String): Array[Byte] = {
-    val s = entityType + "-" + entityId
-    // get a new MessageDigest object each time for thread-safe
-    val md5 = MessageDigest.getInstance("MD5")
-    md5.digest(Bytes.toBytes(s))
-  }
-
-  class RowKey(
-    val b: Array[Byte]
-  ) {
-    require((b.size == 32), s"Incorrect b size: ${b.size}")
-    lazy val entityHash: Array[Byte] = b.slice(0, 16)
-    lazy val millis: Long = Bytes.toLong(b.slice(16, 24))
-    lazy val uuidLow: Long = Bytes.toLong(b.slice(24, 32))
-
-    lazy val toBytes: Array[Byte] = b
-
-    override def toString: String = {
-      Base64.encodeBase64URLSafeString(toBytes)
-    }
-  }
-
-  object RowKey {
-    def apply(
-      entityType: String,
-      entityId: String,
-      millis: Long,
-      uuidLow: Long): RowKey = {
-        // add UUID least significant bits for multiple actions at the same time
-        // (UUID's most significant bits are actually timestamp,
-        // use eventTime instead).
-        val b = hash(entityType, entityId) ++
-          Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow)
-        new RowKey(b)
-      }
-
-    // get RowKey from string representation
-    def apply(s: String): RowKey = {
-      try {
-        apply(Base64.decodeBase64(s))
-      } catch {
-        case e: Exception => throw new RowKeyException(
-          s"Failed to convert String ${s} to RowKey because ${e}", e)
-      }
-    }
-
-    def apply(b: Array[Byte]): RowKey = {
-      if (b.size != 32) {
-        val bString = b.mkString(",")
-        throw new RowKeyException(
-          s"Incorrect byte array size. Bytes: ${bString}.")
-      }
-      new RowKey(b)
-    }
-
-  }
-
-  class RowKeyException(val msg: String, val cause: Exception)
-    extends Exception(msg, cause) {
-      def this(msg: String) = this(msg, null)
-    }
-
-  case class PartialRowKey(entityType: String, entityId: String,
-    millis: Option[Long] = None) {
-    val toBytes: Array[Byte] = {
-      hash(entityType, entityId) ++
-        (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]()))
-    }
-  }
-
-  def eventToPut(event: Event, appId: Int): (Put, RowKey) = {
-    // generate new rowKey if eventId is None
-    val rowKey = event.eventId.map { id =>
-      RowKey(id) // create rowKey from eventId
-    }.getOrElse {
-      // TOOD: use real UUID. not pseudo random
-      val uuidLow: Long = UUID.randomUUID().getLeastSignificantBits
-      RowKey(
-        entityType = event.entityType,
-        entityId = event.entityId,
-        millis = event.eventTime.getMillis,
-        uuidLow = uuidLow
-      )
-    }
-
-    val eBytes = Bytes.toBytes("e")
-    // use eventTime as HBase's cell timestamp
-    val put = new Put(rowKey.toBytes, event.eventTime.getMillis)
-
-    def addStringToE(col: Array[Byte], v: String): Put = {
-      put.add(eBytes, col, Bytes.toBytes(v))
-    }
-
-    def addLongToE(col: Array[Byte], v: Long): Put = {
-      put.add(eBytes, col, Bytes.toBytes(v))
-    }
-
-    addStringToE(colNames("event"), event.event)
-    addStringToE(colNames("entityType"), event.entityType)
-    addStringToE(colNames("entityId"), event.entityId)
-
-    event.targetEntityType.foreach { targetEntityType =>
-      addStringToE(colNames("targetEntityType"), targetEntityType)
-    }
-
-    event.targetEntityId.foreach { targetEntityId =>
-      addStringToE(colNames("targetEntityId"), targetEntityId)
-    }
-
-    // TODO: make properties Option[]
-    if (!event.properties.isEmpty) {
-      addStringToE(colNames("properties"), write(event.properties.toJObject))
-    }
-
-    event.prId.foreach { prId =>
-      addStringToE(colNames("prId"), prId)
-    }
-
-    addLongToE(colNames("eventTime"), event.eventTime.getMillis)
-    val eventTimeZone = event.eventTime.getZone
-    if (!eventTimeZone.equals(EventValidation.defaultTimeZone)) {
-      addStringToE(colNames("eventTimeZone"), eventTimeZone.getID)
-    }
-
-    addLongToE(colNames("creationTime"), event.creationTime.getMillis)
-    val creationTimeZone = event.creationTime.getZone
-    if (!creationTimeZone.equals(EventValidation.defaultTimeZone)) {
-      addStringToE(colNames("creationTimeZone"), creationTimeZone.getID)
-    }
-
-    // can use zero-length byte array for tag cell value
-    (put, rowKey)
-  }
-
-  def resultToEvent(result: Result, appId: Int): Event = {
-    val rowKey = RowKey(result.getRow())
-
-    val eBytes = Bytes.toBytes("e")
-    // val e = result.getFamilyMap(eBytes)
-
-    def getStringCol(col: String): String = {
-      val r = result.getValue(eBytes, colNames(col))
-      require(r != null,
-        s"Failed to get value for column ${col}. " +
-        s"Rowkey: ${rowKey.toString} " +
-        s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
-
-      Bytes.toString(r)
-    }
-
-    def getLongCol(col: String): Long = {
-      val r = result.getValue(eBytes, colNames(col))
-      require(r != null,
-        s"Failed to get value for column ${col}. " +
-        s"Rowkey: ${rowKey.toString} " +
-        s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")
-
-      Bytes.toLong(r)
-    }
-
-    def getOptStringCol(col: String): Option[String] = {
-      val r = result.getValue(eBytes, colNames(col))
-      if (r == null) {
-        None
-      } else {
-        Some(Bytes.toString(r))
-      }
-    }
-
-    def getTimestamp(col: String): Long = {
-      result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp()
-    }
-
-    val event = getStringCol("event")
-    val entityType = getStringCol("entityType")
-    val entityId = getStringCol("entityId")
-    val targetEntityType = getOptStringCol("targetEntityType")
-    val targetEntityId = getOptStringCol("targetEntityId")
-    val properties: DataMap = getOptStringCol("properties")
-      .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
-    val prId = getOptStringCol("prId")
-    val eventTimeZone = getOptStringCol("eventTimeZone")
-      .map(DateTimeZone.forID(_))
-      .getOrElse(EventValidation.defaultTimeZone)
-    val eventTime = new DateTime(
-      getLongCol("eventTime"), eventTimeZone)
-    val creationTimeZone = getOptStringCol("creationTimeZone")
-      .map(DateTimeZone.forID(_))
-      .getOrElse(EventValidation.defaultTimeZone)
-    val creationTime: DateTime = new DateTime(
-      getLongCol("creationTime"), creationTimeZone)
-
-    Event(
-      eventId = Some(RowKey(result.getRow()).toString),
-      event = event,
-      entityType = entityType,
-      entityId = entityId,
-      targetEntityType = targetEntityType,
-      targetEntityId = targetEntityId,
-      properties = properties,
-      eventTime = eventTime,
-      tags = Seq(),
-      prId = prId,
-      creationTime = creationTime
-    )
-  }
-
-
-  // for mandatory field. None means don't care.
-  // for optional field. None means don't care.
-  //    Some(None) means not exist.
-  //    Some(Some(x)) means it should match x
-  def createScan(
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    reversed: Option[Boolean] = None): Scan = {
-
-    val scan: Scan = new Scan()
-
-    (entityType, entityId) match {
-      case (Some(et), Some(eid)) => {
-        val start = PartialRowKey(et, eid,
-          startTime.map(_.getMillis)).toBytes
-        // if no untilTime, stop when reach next bytes of entityTypeAndId
-        val stop = PartialRowKey(et, eid,
-          untilTime.map(_.getMillis).orElse(Some(-1))).toBytes
-
-        if (reversed.getOrElse(false)) {
-          // Reversed order.
-          // If you specify a startRow and stopRow,
-          // to scan in reverse, the startRow needs to be lexicographically
-          // after the stopRow.
-          scan.setStartRow(stop)
-          scan.setStopRow(start)
-          scan.setReversed(true)
-        } else {
-          scan.setStartRow(start)
-          scan.setStopRow(stop)
-        }
-      }
-      case (_, _) => {
-        val minTime: Long = startTime.map(_.getMillis).getOrElse(0)
-        val maxTime: Long = untilTime.map(_.getMillis).getOrElse(Long.MaxValue)
-        scan.setTimeRange(minTime, maxTime)
-        if (reversed.getOrElse(false)) {
-          scan.setReversed(true)
-        }
-      }
-    }
-
-    val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL)
-
-    val eBytes = Bytes.toBytes("e")
-
-    def createBinaryFilter(col: String, value: Array[Byte]): SingleColumnValueFilter = {
-      val comp = new BinaryComparator(value)
-      new SingleColumnValueFilter(
-        eBytes, colNames(col), CompareOp.EQUAL, comp)
-    }
-
-    // skip the row if the column exists
-    def createSkipRowIfColumnExistFilter(col: String): SkipFilter = {
-      val comp = new BinaryComparator(colNames(col))
-      val q = new QualifierFilter(CompareOp.NOT_EQUAL, comp)
-      // filters an entire row if any of the Cell checks do not pass
-      new SkipFilter(q)
-    }
-
-    entityType.foreach { et =>
-      val compType = new BinaryComparator(Bytes.toBytes(et))
-      val filterType = new SingleColumnValueFilter(
-        eBytes, colNames("entityType"), CompareOp.EQUAL, compType)
-      filters.addFilter(filterType)
-    }
-
-    entityId.foreach { eid =>
-      val compId = new BinaryComparator(Bytes.toBytes(eid))
-      val filterId = new SingleColumnValueFilter(
-        eBytes, colNames("entityId"), CompareOp.EQUAL, compId)
-      filters.addFilter(filterId)
-    }
-
-    eventNames.foreach { eventsList =>
-      // match any of event in the eventsList
-      val eventFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE)
-      eventsList.foreach { e =>
-        val compEvent = new BinaryComparator(Bytes.toBytes(e))
-        val filterEvent = new SingleColumnValueFilter(
-          eBytes, colNames("event"), CompareOp.EQUAL, compEvent)
-        eventFilters.addFilter(filterEvent)
-      }
-      if (!eventFilters.getFilters().isEmpty) {
-        filters.addFilter(eventFilters)
-      }
-    }
-
-    targetEntityType.foreach { tetOpt =>
-      if (tetOpt.isEmpty) {
-        val filter = createSkipRowIfColumnExistFilter("targetEntityType")
-        filters.addFilter(filter)
-      } else {
-        tetOpt.foreach { tet =>
-          val filter = createBinaryFilter(
-            "targetEntityType", Bytes.toBytes(tet))
-          // the entire row will be skipped if the column is not found.
-          filter.setFilterIfMissing(true)
-          filters.addFilter(filter)
-        }
-      }
-    }
-
-    targetEntityId.foreach { teidOpt =>
-      if (teidOpt.isEmpty) {
-        val filter = createSkipRowIfColumnExistFilter("targetEntityId")
-        filters.addFilter(filter)
-      } else {
-        teidOpt.foreach { teid =>
-          val filter = createBinaryFilter(
-            "targetEntityId", Bytes.toBytes(teid))
-          // the entire row will be skipped if the column is not found.
-          filter.setFilterIfMissing(true)
-          filters.addFilter(filter)
-        }
-      }
-    }
-
-    if (!filters.getFilters().isEmpty) {
-      scan.setFilter(filters)
-    }
-
-    scan
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/hbase/HBLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/hbase/HBLEvents.scala b/data/src/main/scala/io/prediction/data/storage/hbase/HBLEvents.scala
deleted file mode 100644
index 6985ebe..0000000
--- a/data/src/main/scala/io/prediction/data/storage/hbase/HBLEvents.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.hbase
-
-import grizzled.slf4j.Logging
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.LEvents
-import io.prediction.data.storage.StorageClientConfig
-import io.prediction.data.storage.hbase.HBEventsUtil.RowKey
-import org.apache.hadoop.hbase.HColumnDescriptor
-import org.apache.hadoop.hbase.HTableDescriptor
-import org.apache.hadoop.hbase.NamespaceDescriptor
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client._
-import org.joda.time.DateTime
-
-import scala.collection.JavaConversions._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-
-class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String)
-  extends LEvents with Logging {
-
-  // implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer
-
-  def resultToEvent(result: Result, appId: Int): Event =
-    HBEventsUtil.resultToEvent(result, appId)
-
-  def getTable(appId: Int, channelId: Option[Int] = None): HTableInterface =
-    client.connection.getTable(HBEventsUtil.tableName(namespace, appId, channelId))
-
-  override
-  def init(appId: Int, channelId: Option[Int] = None): Boolean = {
-    // check namespace exist
-    val existingNamespace = client.admin.listNamespaceDescriptors()
-      .map(_.getName)
-    if (!existingNamespace.contains(namespace)) {
-      val nameDesc = NamespaceDescriptor.create(namespace).build()
-      info(s"The namespace ${namespace} doesn't exist yet. Creating now...")
-      client.admin.createNamespace(nameDesc)
-    }
-
-    val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
-    if (!client.admin.tableExists(tableName)) {
-      info(s"The table ${tableName.getNameAsString()} doesn't exist yet." +
-        " Creating now...")
-      val tableDesc = new HTableDescriptor(tableName)
-      tableDesc.addFamily(new HColumnDescriptor("e"))
-      tableDesc.addFamily(new HColumnDescriptor("r")) // reserved
-      client.admin.createTable(tableDesc)
-    }
-    true
-  }
-
-  override
-  def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
-    val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId))
-    try {
-      if (client.admin.tableExists(tableName)) {
-        info(s"Removing table ${tableName.getNameAsString()}...")
-        client.admin.disableTable(tableName)
-        client.admin.deleteTable(tableName)
-      } else {
-        info(s"Table ${tableName.getNameAsString()} doesn't exist." +
-          s" Nothing is deleted.")
-      }
-      true
-    } catch {
-      case e: Exception => {
-        error(s"Fail to remove table for appId ${appId}. Exception: ${e}")
-        false
-      }
-    }
-  }
-
-  override
-  def close(): Unit = {
-    client.admin.close()
-    client.connection.close()
-  }
-
-  override
-  def futureInsert(
-    event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
-    Future[String] = {
-    Future {
-      val table = getTable(appId, channelId)
-      val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
-      table.put(put)
-      table.flushCommits()
-      table.close()
-      rowKey.toString
-    }
-  }
-
-  override
-  def futureGet(
-    eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
-    Future[Option[Event]] = {
-      Future {
-        val table = getTable(appId, channelId)
-        val rowKey = RowKey(eventId)
-        val get = new Get(rowKey.toBytes)
-
-        val result = table.get(get)
-        table.close()
-
-        if (!result.isEmpty()) {
-          val event = resultToEvent(result, appId)
-          Some(event)
-        } else {
-          None
-        }
-      }
-    }
-
-  override
-  def futureDelete(
-    eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
-    Future[Boolean] = {
-    Future {
-      val table = getTable(appId, channelId)
-      val rowKey = RowKey(eventId)
-      val exists = table.exists(new Get(rowKey.toBytes))
-      table.delete(new Delete(rowKey.toBytes))
-      table.close()
-      exists
-    }
-  }
-
-  override
-  def futureFind(
-    appId: Int,
-    channelId: Option[Int] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    limit: Option[Int] = None,
-    reversed: Option[Boolean] = None)(implicit ec: ExecutionContext):
-    Future[Iterator[Event]] = {
-      Future {
-
-        require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)),
-          "the parameter reversed can only be used with both entityType and entityId specified.")
-
-        val table = getTable(appId, channelId)
-
-        val scan = HBEventsUtil.createScan(
-          startTime = startTime,
-          untilTime = untilTime,
-          entityType = entityType,
-          entityId = entityId,
-          eventNames = eventNames,
-          targetEntityType = targetEntityType,
-          targetEntityId = targetEntityId,
-          reversed = reversed)
-        val scanner = table.getScanner(scan)
-        table.close()
-
-        val eventsIter = scanner.iterator()
-
-        // Get all events if None or Some(-1)
-        val results: Iterator[Result] = limit match {
-          case Some(-1) => eventsIter
-          case None => eventsIter
-          case Some(x) => eventsIter.take(x)
-        }
-
-        val eventsIt = results.map { resultToEvent(_, appId) }
-
-        eventsIt
-      }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/hbase/HBPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/hbase/HBPEvents.scala b/data/src/main/scala/io/prediction/data/storage/hbase/HBPEvents.scala
deleted file mode 100644
index 9d72529..0000000
--- a/data/src/main/scala/io/prediction/data/storage/hbase/HBPEvents.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.hbase
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.PEvents
-import io.prediction.data.storage.StorageClientConfig
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.mapreduce.PIOHBaseUtil
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.OutputFormat
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.joda.time.DateTime
-
-class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String) extends PEvents {
-
-  def checkTableExists(appId: Int, channelId: Option[Int]): Unit = {
-    if (!client.admin.tableExists(HBEventsUtil.tableName(namespace, appId, channelId))) {
-      if (channelId.nonEmpty) {
-        logger.error(s"The appId $appId with channelId $channelId does not exist." +
-          s" Please use valid appId and channelId.")
-        throw new Exception(s"HBase table not found for appId $appId" +
-          s" with channelId $channelId.")
-      } else {
-        logger.error(s"The appId $appId does not exist. Please use valid appId.")
-        throw new Exception(s"HBase table not found for appId $appId.")
-      }
-    }
-  }
-
-  override
-  def find(
-    appId: Int,
-    channelId: Option[Int] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None
-    )(sc: SparkContext): RDD[Event] = {
-
-    checkTableExists(appId, channelId)
-
-    val conf = HBaseConfiguration.create()
-    conf.set(TableInputFormat.INPUT_TABLE,
-      HBEventsUtil.tableName(namespace, appId, channelId))
-
-    val scan = HBEventsUtil.createScan(
-        startTime = startTime,
-        untilTime = untilTime,
-        entityType = entityType,
-        entityId = entityId,
-        eventNames = eventNames,
-        targetEntityType = targetEntityType,
-        targetEntityId = targetEntityId,
-        reversed = None)
-    scan.setCaching(500) // TODO
-    scan.setCacheBlocks(false) // TODO
-
-    conf.set(TableInputFormat.SCAN, PIOHBaseUtil.convertScanToString(scan))
-
-    // HBase is not accessed until this rdd is actually used.
-    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
-      classOf[ImmutableBytesWritable],
-      classOf[Result]).map {
-        case (key, row) => HBEventsUtil.resultToEvent(row, appId)
-      }
-
-    rdd
-  }
-
-  override
-  def write(
-    events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
-
-    checkTableExists(appId, channelId)
-
-    val conf = HBaseConfiguration.create()
-    conf.set(TableOutputFormat.OUTPUT_TABLE,
-      HBEventsUtil.tableName(namespace, appId, channelId))
-    conf.setClass("mapreduce.outputformat.class",
-      classOf[TableOutputFormat[Object]],
-      classOf[OutputFormat[Object, Writable]])
-
-    events.map { event =>
-      val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
-      (new ImmutableBytesWritable(rowKey.toBytes), put)
-    }.saveAsNewAPIHadoopDataset(conf)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/hbase/PIOHBaseUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/hbase/PIOHBaseUtil.scala b/data/src/main/scala/io/prediction/data/storage/hbase/PIOHBaseUtil.scala
deleted file mode 100644
index 1027930..0000000
--- a/data/src/main/scala/io/prediction/data/storage/hbase/PIOHBaseUtil.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package org.apache.hadoop.hbase.mapreduce
-
-/* Pretends to be hbase.mapreduce package in order to expose its
- * Package-accessible only static function convertScanToString()
- */
-
-import org.apache.hadoop.hbase.client.Scan
-
-object PIOHBaseUtil {
-  def convertScanToString(scan: Scan): String = {
-    TableMapReduceUtil.convertScanToString(scan)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/hbase/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/hbase/StorageClient.scala b/data/src/main/scala/io/prediction/data/storage/hbase/StorageClient.scala
deleted file mode 100644
index bfede39..0000000
--- a/data/src/main/scala/io/prediction/data/storage/hbase/StorageClient.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage.hbase
-
-import io.prediction.data.storage.BaseStorageClient
-import io.prediction.data.storage.StorageClientConfig
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.MasterNotRunningException
-import org.apache.hadoop.hbase.ZooKeeperConnectionException
-import org.apache.hadoop.hbase.client.HConnectionManager
-import org.apache.hadoop.hbase.client.HConnection
-import org.apache.hadoop.hbase.client.HBaseAdmin
-
-import grizzled.slf4j.Logging
-
-case class HBClient(
-  val conf: Configuration,
-  val connection: HConnection,
-  val admin: HBaseAdmin
-)
-
-class StorageClient(val config: StorageClientConfig)
-  extends BaseStorageClient with Logging {
-
-  val conf = HBaseConfiguration.create()
-
-  if (config.test) {
-    // use fewer retries and shorter timeout for test mode
-    conf.set("hbase.client.retries.number", "1")
-    conf.set("zookeeper.session.timeout", "30000");
-    conf.set("zookeeper.recovery.retry", "1")
-  }
-
-  try {
-    HBaseAdmin.checkHBaseAvailable(conf)
-  } catch {
-    case e: MasterNotRunningException =>
-      error("HBase master is not running (ZooKeeper ensemble: " +
-        conf.get("hbase.zookeeper.quorum") + "). Please make sure that HBase " +
-        "is running properly, and that the configuration is pointing at the " +
-        "correct ZooKeeper ensemble.")
-      throw e
-    case e: ZooKeeperConnectionException =>
-      error("Cannot connect to ZooKeeper (ZooKeeper ensemble: " +
-        conf.get("hbase.zookeeper.quorum") + "). Please make sure that the " +
-        "configuration is pointing at the correct ZooKeeper ensemble. By " +
-        "default, HBase manages its own ZooKeeper, so if you have not " +
-        "configured HBase to use an external ZooKeeper, that means your " +
-        "HBase is not started or configured properly.")
-      throw e
-    case e: Exception => {
-      error("Failed to connect to HBase." +
-        " Please check if HBase is running properly.")
-      throw e
-    }
-  }
-
-  val connection = HConnectionManager.createConnection(conf)
-
-  val client = HBClient(
-    conf = conf,
-    connection = connection,
-    admin = new HBaseAdmin(connection)
-  )
-
-  override
-  val prefix = "HB"
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/hbase/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/hbase/package.scala b/data/src/main/scala/io/prediction/data/storage/hbase/package.scala
deleted file mode 100644
index 46aa10c..0000000
--- a/data/src/main/scala/io/prediction/data/storage/hbase/package.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-/** HBase implementation of storage traits, supporting event data only
-  *
-  * @group Implementation
-  */
-package object hbase {}