You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:40 UTC

[09/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
new file mode 100644
index 0000000..52c8b44
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
@@ -0,0 +1,86 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Apps]] */
+class JDBCApps(client: String, config: StorageClientConfig, prefix: String)
+  extends Apps with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "apps")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id serial not null primary key,
+      name text not null,
+      description text)""".execute.apply()
+  }
+
+  def insert(app: App): Option[Int] = DB localTx { implicit session =>
+    val q = if (app.id == 0) {
+      sql"""
+      insert into $tableName (name, description) values(${app.name}, ${app.description})
+      """
+    } else {
+      sql"""
+      insert into $tableName values(${app.id}, ${app.name}, ${app.description})
+      """
+    }
+    Some(q.updateAndReturnGeneratedKey().apply().toInt)
+  }
+
+  def get(id: Int): Option[App] = DB readOnly { implicit session =>
+    sql"SELECT id, name, description FROM $tableName WHERE id = ${id}".map(rs =>
+      App(
+        id = rs.int("id"),
+        name = rs.string("name"),
+        description = rs.stringOpt("description"))
+    ).single().apply()
+  }
+
+  def getByName(name: String): Option[App] = DB readOnly { implicit session =>
+    sql"SELECT id, name, description FROM $tableName WHERE name = ${name}".map(rs =>
+      App(
+        id = rs.int("id"),
+        name = rs.string("name"),
+        description = rs.stringOpt("description"))
+    ).single().apply()
+  }
+
+  def getAll(): Seq[App] = DB readOnly { implicit session =>
+    sql"SELECT id, name, description FROM $tableName".map(rs =>
+      App(
+        id = rs.int("id"),
+        name = rs.string("name"),
+        description = rs.stringOpt("description"))
+    ).list().apply()
+  }
+
+  def update(app: App): Unit = DB localTx { implicit session =>
+    sql"""
+    update $tableName set name = ${app.name}, description = ${app.description}
+    where id = ${app.id}""".update().apply()
+  }
+
+  def delete(id: Int): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
new file mode 100644
index 0000000..f94a64a
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
@@ -0,0 +1,66 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Channels]] */
+class JDBCChannels(client: String, config: StorageClientConfig, prefix: String)
+  extends Channels with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "channels")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id serial not null primary key,
+      name text not null,
+      appid integer not null)""".execute().apply()
+  }
+
+  def insert(channel: Channel): Option[Int] = DB localTx { implicit session =>
+    val q = if (channel.id == 0) {
+      sql"INSERT INTO $tableName (name, appid) VALUES(${channel.name}, ${channel.appid})"
+    } else {
+      sql"INSERT INTO $tableName VALUES(${channel.id}, ${channel.name}, ${channel.appid})"
+    }
+    Some(q.updateAndReturnGeneratedKey().apply().toInt)
+  }
+
+  def get(id: Int): Option[Channel] = DB localTx { implicit session =>
+    sql"SELECT id, name, appid FROM $tableName WHERE id = $id".
+      map(resultToChannel).single().apply()
+  }
+
+  def getByAppid(appid: Int): Seq[Channel] = DB localTx { implicit session =>
+    sql"SELECT id, name, appid FROM $tableName WHERE appid = $appid".
+      map(resultToChannel).list().apply()
+  }
+
+  def delete(id: Int): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+
+  def resultToChannel(rs: WrappedResultSet): Channel = {
+    Channel(
+      id = rs.int("id"),
+      name = rs.string("name"),
+      appid = rs.int("appid"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
new file mode 100644
index 0000000..a4bd640
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
@@ -0,0 +1,194 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[EngineInstances]] */
+class JDBCEngineInstances(client: String, config: StorageClientConfig, prefix: String)
+  extends EngineInstances with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "engineinstances")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id varchar(100) not null primary key,
+      status text not null,
+      startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      engineId text not null,
+      engineVersion text not null,
+      engineVariant text not null,
+      engineFactory text not null,
+      batch text not null,
+      env text not null,
+      sparkConf text not null,
+      datasourceParams text not null,
+      preparatorParams text not null,
+      algorithmsParams text not null,
+      servingParams text not null)""".execute().apply()
+  }
+
+  def insert(i: EngineInstance): String = DB localTx { implicit session =>
+    val id = java.util.UUID.randomUUID().toString
+    sql"""
+    INSERT INTO $tableName VALUES(
+      $id,
+      ${i.status},
+      ${i.startTime},
+      ${i.endTime},
+      ${i.engineId},
+      ${i.engineVersion},
+      ${i.engineVariant},
+      ${i.engineFactory},
+      ${i.batch},
+      ${JDBCUtils.mapToString(i.env)},
+      ${JDBCUtils.mapToString(i.sparkConf)},
+      ${i.dataSourceParams},
+      ${i.preparatorParams},
+      ${i.algorithmsParams},
+      ${i.servingParams})""".update().apply()
+    id
+  }
+
+  def get(id: String): Option[EngineInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      engineId,
+      engineVersion,
+      engineVariant,
+      engineFactory,
+      batch,
+      env,
+      sparkConf,
+      datasourceParams,
+      preparatorParams,
+      algorithmsParams,
+      servingParams
+    FROM $tableName WHERE id = $id""".map(resultToEngineInstance).
+      single().apply()
+  }
+
+  def getAll(): Seq[EngineInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      engineId,
+      engineVersion,
+      engineVariant,
+      engineFactory,
+      batch,
+      env,
+      sparkConf,
+      datasourceParams,
+      preparatorParams,
+      algorithmsParams,
+      servingParams
+    FROM $tableName""".map(resultToEngineInstance).list().apply()
+  }
+
+  def getLatestCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Option[EngineInstance] =
+    getCompleted(engineId, engineVersion, engineVariant).headOption
+
+  def getCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Seq[EngineInstance] = DB localTx { implicit s =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      engineId,
+      engineVersion,
+      engineVariant,
+      engineFactory,
+      batch,
+      env,
+      sparkConf,
+      datasourceParams,
+      preparatorParams,
+      algorithmsParams,
+      servingParams
+    FROM $tableName
+    WHERE
+      status = 'COMPLETED' AND
+      engineId = $engineId AND
+      engineVersion = $engineVersion AND
+      engineVariant = $engineVariant
+    ORDER BY startTime DESC""".
+      map(resultToEngineInstance).list().apply()
+  }
+
+  def update(i: EngineInstance): Unit = DB localTx { implicit session =>
+    sql"""
+    update $tableName set
+      status = ${i.status},
+      startTime = ${i.startTime},
+      endTime = ${i.endTime},
+      engineId = ${i.engineId},
+      engineVersion = ${i.engineVersion},
+      engineVariant = ${i.engineVariant},
+      engineFactory = ${i.engineFactory},
+      batch = ${i.batch},
+      env = ${JDBCUtils.mapToString(i.env)},
+      sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+      datasourceParams = ${i.dataSourceParams},
+      preparatorParams = ${i.preparatorParams},
+      algorithmsParams = ${i.algorithmsParams},
+      servingParams = ${i.servingParams}
+    where id = ${i.id}""".update().apply()
+  }
+
+  def delete(id: String): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+
+  /** Convert JDBC results to [[EngineInstance]] */
+  def resultToEngineInstance(rs: WrappedResultSet): EngineInstance = {
+    EngineInstance(
+      id = rs.string("id"),
+      status = rs.string("status"),
+      startTime = rs.jodaDateTime("startTime"),
+      endTime = rs.jodaDateTime("endTime"),
+      engineId = rs.string("engineId"),
+      engineVersion = rs.string("engineVersion"),
+      engineVariant = rs.string("engineVariant"),
+      engineFactory = rs.string("engineFactory"),
+      batch = rs.string("batch"),
+      env = JDBCUtils.stringToMap(rs.string("env")),
+      sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+      dataSourceParams = rs.string("datasourceParams"),
+      preparatorParams = rs.string("preparatorParams"),
+      algorithmsParams = rs.string("algorithmsParams"),
+      servingParams = rs.string("servingParams"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
new file mode 100644
index 0000000..b766689
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
@@ -0,0 +1,111 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifests
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[EngineManifests]] */
+class JDBCEngineManifests(client: String, config: StorageClientConfig, prefix: String)
+  extends EngineManifests with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "enginemanifests")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id varchar(100) not null primary key,
+      version text not null,
+      engineName text not null,
+      description text,
+      files text not null,
+      engineFactory text not null)""".execute().apply()
+  }
+
+  def insert(m: EngineManifest): Unit = DB localTx { implicit session =>
+    sql"""
+    INSERT INTO $tableName VALUES(
+      ${m.id},
+      ${m.version},
+      ${m.name},
+      ${m.description},
+      ${m.files.mkString(",")},
+      ${m.engineFactory})""".update().apply()
+  }
+
+  def get(id: String, version: String): Option[EngineManifest] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      version,
+      engineName,
+      description,
+      files,
+      engineFactory
+    FROM $tableName WHERE id = $id AND version = $version""".
+      map(resultToEngineManifest).single().apply()
+  }
+
+  def getAll(): Seq[EngineManifest] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      version,
+      engineName,
+      description,
+      files,
+      engineFactory
+    FROM $tableName""".map(resultToEngineManifest).list().apply()
+  }
+
+  def update(m: EngineManifest, upsert: Boolean = false): Unit = {
+    var r = 0
+    DB localTx { implicit session =>
+      r = sql"""
+      update $tableName set
+        engineName = ${m.name},
+        description = ${m.description},
+        files = ${m.files.mkString(",")},
+        engineFactory = ${m.engineFactory}
+      where id = ${m.id} and version = ${m.version}""".update().apply()
+    }
+    if (r == 0) {
+      if (upsert) {
+        insert(m)
+      } else {
+        error("Cannot find a record to update, and upsert is not enabled.")
+      }
+    }
+  }
+
+  def delete(id: String, version: String): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id AND version = $version".
+      update().apply()
+  }
+
+  /** Convert JDBC results to [[EngineManifest]] */
+  def resultToEngineManifest(rs: WrappedResultSet): EngineManifest = {
+    EngineManifest(
+      id = rs.string("id"),
+      version = rs.string("version"),
+      name = rs.string("engineName"),
+      description = rs.stringOpt("description"),
+      files = rs.string("files").split(","),
+      engineFactory = rs.string("engineFactory"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
new file mode 100644
index 0000000..1811271
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
@@ -0,0 +1,162 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementations of [[EvaluationInstances]] */
+class JDBCEvaluationInstances(client: String, config: StorageClientConfig, prefix: String)
+  extends EvaluationInstances with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "evaluationinstances")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id varchar(100) not null primary key,
+      status text not null,
+      startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      evaluationClass text not null,
+      engineParamsGeneratorClass text not null,
+      batch text not null,
+      env text not null,
+      sparkConf text not null,
+      evaluatorResults text not null,
+      evaluatorResultsHTML text not null,
+      evaluatorResultsJSON text)""".execute().apply()
+  }
+
+  def insert(i: EvaluationInstance): String = DB localTx { implicit session =>
+    val id = java.util.UUID.randomUUID().toString
+    sql"""
+    INSERT INTO $tableName VALUES(
+      $id,
+      ${i.status},
+      ${i.startTime},
+      ${i.endTime},
+      ${i.evaluationClass},
+      ${i.engineParamsGeneratorClass},
+      ${i.batch},
+      ${JDBCUtils.mapToString(i.env)},
+      ${JDBCUtils.mapToString(i.sparkConf)},
+      ${i.evaluatorResults},
+      ${i.evaluatorResultsHTML},
+      ${i.evaluatorResultsJSON})""".update().apply()
+    id
+  }
+
+  def get(id: String): Option[EvaluationInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      evaluationClass,
+      engineParamsGeneratorClass,
+      batch,
+      env,
+      sparkConf,
+      evaluatorResults,
+      evaluatorResultsHTML,
+      evaluatorResultsJSON
+    FROM $tableName WHERE id = $id
+    """.map(resultToEvaluationInstance).single().apply()
+  }
+
+  def getAll(): Seq[EvaluationInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      evaluationClass,
+      engineParamsGeneratorClass,
+      batch,
+      env,
+      sparkConf,
+      evaluatorResults,
+      evaluatorResultsHTML,
+      evaluatorResultsJSON
+    FROM $tableName
+    """.map(resultToEvaluationInstance).list().apply()
+  }
+
+  def getCompleted(): Seq[EvaluationInstance] = DB localTx { implicit s =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      evaluationClass,
+      engineParamsGeneratorClass,
+      batch,
+      env,
+      sparkConf,
+      evaluatorResults,
+      evaluatorResultsHTML,
+      evaluatorResultsJSON
+    FROM $tableName
+    WHERE
+      status = 'EVALCOMPLETED'
+    ORDER BY starttime DESC
+    """.map(resultToEvaluationInstance).list().apply()
+  }
+
+  def update(i: EvaluationInstance): Unit = DB localTx { implicit session =>
+    sql"""
+    update $tableName set
+      status = ${i.status},
+      startTime = ${i.startTime},
+      endTime = ${i.endTime},
+      evaluationClass = ${i.evaluationClass},
+      engineParamsGeneratorClass = ${i.engineParamsGeneratorClass},
+      batch = ${i.batch},
+      env = ${JDBCUtils.mapToString(i.env)},
+      sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+      evaluatorResults = ${i.evaluatorResults},
+      evaluatorResultsHTML = ${i.evaluatorResultsHTML},
+      evaluatorResultsJSON = ${i.evaluatorResultsJSON}
+    where id = ${i.id}""".update().apply()
+  }
+
+  def delete(id: String): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+
+  /** Convert JDBC results to [[EvaluationInstance]] */
+  def resultToEvaluationInstance(rs: WrappedResultSet): EvaluationInstance = {
+    EvaluationInstance(
+      id = rs.string("id"),
+      status = rs.string("status"),
+      startTime = rs.jodaDateTime("startTime"),
+      endTime = rs.jodaDateTime("endTime"),
+      evaluationClass = rs.string("evaluationClass"),
+      engineParamsGeneratorClass = rs.string("engineParamsGeneratorClass"),
+      batch = rs.string("batch"),
+      env = JDBCUtils.stringToMap(rs.string("env")),
+      sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+      evaluatorResults = rs.string("evaluatorResults"),
+      evaluatorResultsHTML = rs.string("evaluatorResultsHTML"),
+      evaluatorResultsJSON = rs.string("evaluatorResultsJSON"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
new file mode 100644
index 0000000..945879c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
@@ -0,0 +1,241 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s.JObject
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import scalikejdbc._
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+/** JDBC implementation of [[LEvents]] */
+class JDBCLEvents(
+    client: String,
+    config: StorageClientConfig,
+    namespace: String) extends LEvents with Logging {
+  implicit private val formats = org.json4s.DefaultFormats
+
+  def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+
+    // To use index, it must be varchar less than 255 characters on a VARCHAR column
+    val useIndex = config.properties.contains("INDEX") &&
+      config.properties("INDEX").equalsIgnoreCase("enabled")
+
+    val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+    val entityIdIndexName = s"idx_${tableName}_ei"
+    val entityTypeIndexName = s"idx_${tableName}_et"
+    DB autoCommit { implicit session =>
+      if (useIndex) {
+        SQL(s"""
+      create table if not exists $tableName (
+        id varchar(32) not null primary key,
+        event varchar(255) not null,
+        entityType varchar(255) not null,
+        entityId varchar(255) not null,
+        targetEntityType text,
+        targetEntityId text,
+        properties text,
+        eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        eventTimeZone varchar(50) not null,
+        tags text,
+        prId text,
+        creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        creationTimeZone varchar(50) not null)""").execute().apply()
+
+        // create index
+        SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply()
+        SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply()
+      } else {
+        SQL(s"""
+      create table if not exists $tableName (
+        id varchar(32) not null primary key,
+        event text not null,
+        entityType text not null,
+        entityId text not null,
+        targetEntityType text,
+        targetEntityId text,
+        properties text,
+        eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        eventTimeZone varchar(50) not null,
+        tags text,
+        prId text,
+        creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        creationTimeZone varchar(50) not null)""").execute().apply()
+      }
+      true
+    }
+  }
+
+  def remove(appId: Int, channelId: Option[Int] = None): Boolean =
+    DB autoCommit { implicit session =>
+      SQL(s"""
+      drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+      """).execute().apply()
+      true
+    }
+
+  def close(): Unit = ConnectionPool.closeAll()
+
+  def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
+    implicit ec: ExecutionContext): Future[String] = Future {
+    DB localTx { implicit session =>
+      val id = event.eventId.getOrElse(JDBCUtils.generateId)
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      sql"""
+      insert into $tableName values(
+        $id,
+        ${event.event},
+        ${event.entityType},
+        ${event.entityId},
+        ${event.targetEntityType},
+        ${event.targetEntityId},
+        ${write(event.properties.toJObject)},
+        ${event.eventTime},
+        ${event.eventTime.getZone.getID},
+        ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None},
+        ${event.prId},
+        ${event.creationTime},
+        ${event.creationTime.getZone.getID}
+      )
+      """.update().apply()
+      id
+    }
+  }
+
+  def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
+    implicit ec: ExecutionContext): Future[Option[Event]] = Future {
+    DB readOnly { implicit session =>
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      sql"""
+      select
+        id,
+        event,
+        entityType,
+        entityId,
+        targetEntityType,
+        targetEntityId,
+        properties,
+        eventTime,
+        eventTimeZone,
+        tags,
+        prId,
+        creationTime,
+        creationTimeZone
+      from $tableName
+      where id = $eventId
+      """.map(resultToEvent).single().apply()
+    }
+  }
+
+  def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
+    implicit ec: ExecutionContext): Future[Boolean] = Future {
+    DB localTx { implicit session =>
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      sql"""
+      delete from $tableName where id = $eventId
+      """.update().apply()
+      true
+    }
+  }
+
+  def futureFind(
+      appId: Int,
+      channelId: Option[Int] = None,
+      startTime: Option[DateTime] = None,
+      untilTime: Option[DateTime] = None,
+      entityType: Option[String] = None,
+      entityId: Option[String] = None,
+      eventNames: Option[Seq[String]] = None,
+      targetEntityType: Option[Option[String]] = None,
+      targetEntityId: Option[Option[String]] = None,
+      limit: Option[Int] = None,
+      reversed: Option[Boolean] = None
+    )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future {
+    DB readOnly { implicit session =>
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      val whereClause = sqls.toAndConditionOpt(
+        startTime.map(x => sqls"eventTime >= $x"),
+        untilTime.map(x => sqls"eventTime < $x"),
+        entityType.map(x => sqls"entityType = $x"),
+        entityId.map(x => sqls"entityId = $x"),
+        eventNames.map(x =>
+          sqls.toOrConditionOpt(x.map(y =>
+            Some(sqls"event = $y")
+          ): _*)
+        ).getOrElse(None),
+        targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y")
+            .getOrElse(sqls"targetEntityType IS NULL")),
+        targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y")
+            .getOrElse(sqls"targetEntityId IS NULL"))
+      ).map(sqls.where(_)).getOrElse(sqls"")
+      val orderByClause = reversed.map(x =>
+        if (x) sqls"eventTime desc" else sqls"eventTime asc"
+      ).getOrElse(sqls"eventTime asc")
+      val limitClause = limit.map(x =>
+        if (x < 0) sqls"" else sqls.limit(x)
+      ).getOrElse(sqls"")
+      val q = sql"""
+      select
+        id,
+        event,
+        entityType,
+        entityId,
+        targetEntityType,
+        targetEntityId,
+        properties,
+        eventTime,
+        eventTimeZone,
+        tags,
+        prId,
+        creationTime,
+        creationTimeZone
+      from $tableName
+      $whereClause
+      order by $orderByClause
+      $limitClause
+      """
+      q.map(resultToEvent).list().apply().toIterator
+    }
+  }
+
+  private[prediction] def resultToEvent(rs: WrappedResultSet): Event = {
+    Event(
+      eventId = rs.stringOpt("id"),
+      event = rs.string("event"),
+      entityType = rs.string("entityType"),
+      entityId = rs.string("entityId"),
+      targetEntityType = rs.stringOpt("targetEntityType"),
+      targetEntityId = rs.stringOpt("targetEntityId"),
+      properties = rs.stringOpt("properties").map(p =>
+        DataMap(read[JObject](p))).getOrElse(DataMap()),
+      eventTime = new DateTime(rs.jodaDateTime("eventTime"),
+        DateTimeZone.forID(rs.string("eventTimeZone"))),
+      tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil),
+      prId = rs.stringOpt("prId"),
+      creationTime = new DateTime(rs.jodaDateTime("creationTime"),
+        DateTimeZone.forID(rs.string("creationTimeZone")))
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
new file mode 100644
index 0000000..01ed6ca
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Models]] */
+class JDBCModels(client: String, config: StorageClientConfig, prefix: String)
+  extends Models with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "models")
+
+  /** Determines binary column type based on JDBC driver type */
+  val binaryColumnType = JDBCUtils.binaryColumnType(client)
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id varchar(100) not null primary key,
+      models $binaryColumnType not null)""".execute().apply()
+  }
+
+  def insert(i: Model): Unit = DB localTx { implicit session =>
+    sql"insert into $tableName values(${i.id}, ${i.models})".update().apply()
+  }
+
+  def get(id: String): Option[Model] = DB readOnly { implicit session =>
+    sql"select id, models from $tableName where id = $id".map { r =>
+      Model(id = r.string("id"), models = r.bytes("models"))
+    }.single().apply()
+  }
+
+  def delete(id: String): Unit = DB localTx { implicit session =>
+    sql"delete from $tableName where id = $id".execute().apply()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
new file mode 100644
index 0000000..c01989c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -0,0 +1,160 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import java.sql.{DriverManager, ResultSet}
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{JdbcRDD, RDD}
+import org.apache.spark.sql.{SQLContext, SaveMode}
+import org.json4s.JObject
+import org.json4s.native.Serialization
+
+/** JDBC implementation of [[PEvents]] */
+class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String) extends PEvents {
+  @transient private implicit lazy val formats = org.json4s.DefaultFormats
+  def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = {
+    val lower = startTime.map(_.getMillis).getOrElse(0.toLong)
+    /** Change the default upper bound from +100 to +1 year because MySQL's
+      * FROM_UNIXTIME(t) will return NULL if we use +100 years.
+      */
+    val upper = untilTime.map(_.getMillis).getOrElse((DateTime.now + 1.years).getMillis)
+    val par = scala.math.min(
+      new Duration(upper - lower).getStandardDays,
+      config.properties.getOrElse("PARTITIONS", "4").toLong).toInt
+    val entityTypeClause = entityType.map(x => s"and entityType = '$x'").getOrElse("")
+    val entityIdClause = entityId.map(x => s"and entityId = '$x'").getOrElse("")
+    val eventNamesClause =
+      eventNames.map("and (" + _.map(y => s"event = '$y'").mkString(" or ") + ")").getOrElse("")
+    val targetEntityTypeClause = targetEntityType.map(
+      _.map(x => s"and targetEntityType = '$x'"
+    ).getOrElse("and targetEntityType is null")).getOrElse("")
+    val targetEntityIdClause = targetEntityId.map(
+      _.map(x => s"and targetEntityId = '$x'"
+    ).getOrElse("and targetEntityId is null")).getOrElse("")
+    val q = s"""
+      select
+        id,
+        event,
+        entityType,
+        entityId,
+        targetEntityType,
+        targetEntityId,
+        properties,
+        eventTime,
+        eventTimeZone,
+        tags,
+        prId,
+        creationTime,
+        creationTimeZone
+      from ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+      where
+        eventTime >= ${JDBCUtils.timestampFunction(client)}(?) and
+        eventTime < ${JDBCUtils.timestampFunction(client)}(?)
+      $entityTypeClause
+      $entityIdClause
+      $eventNamesClause
+      $targetEntityTypeClause
+      $targetEntityIdClause
+      """.replace("\n", " ")
+    new JdbcRDD(
+      sc,
+      () => {
+        DriverManager.getConnection(
+          client,
+          config.properties("USERNAME"),
+          config.properties("PASSWORD"))
+      },
+      q,
+      lower / 1000,
+      upper / 1000,
+      par,
+      (r: ResultSet) => {
+        Event(
+          eventId = Option(r.getString("id")),
+          event = r.getString("event"),
+          entityType = r.getString("entityType"),
+          entityId = r.getString("entityId"),
+          targetEntityType = Option(r.getString("targetEntityType")),
+          targetEntityId = Option(r.getString("targetEntityId")),
+          properties = Option(r.getString("properties")).map(x =>
+            DataMap(Serialization.read[JObject](x))).getOrElse(DataMap()),
+          eventTime = new DateTime(r.getTimestamp("eventTime").getTime,
+            DateTimeZone.forID(r.getString("eventTimeZone"))),
+          tags = Option(r.getString("tags")).map(x =>
+            x.split(",").toList).getOrElse(Nil),
+          prId = Option(r.getString("prId")),
+          creationTime = new DateTime(r.getTimestamp("creationTime").getTime,
+            DateTimeZone.forID(r.getString("creationTimeZone"))))
+      }).cache()
+  }
+
+  def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val sqlContext = new SQLContext(sc)
+
+    import sqlContext.implicits._
+
+    val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+
+    val eventTableColumns = Seq[String](
+        "id"
+      , "event"
+      , "entityType"
+      , "entityId"
+      , "targetEntityType"
+      , "targetEntityId"
+      , "properties"
+      , "eventTime"
+      , "eventTimeZone"
+      , "tags"
+      , "prId"
+      , "creationTime"
+      , "creationTimeZone")
+
+    val eventDF = events.map { event =>
+      (event.eventId.getOrElse(JDBCUtils.generateId)
+        , event.event
+        , event.entityType
+        , event.entityId
+        , event.targetEntityType.orNull
+        , event.targetEntityId.orNull
+        , if (!event.properties.isEmpty) Serialization.write(event.properties.toJObject) else null
+        , new java.sql.Timestamp(event.eventTime.getMillis)
+        , event.eventTime.getZone.getID
+        , if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else null
+        , event.prId
+        , new java.sql.Timestamp(event.creationTime.getMillis)
+        , event.creationTime.getZone.getID)
+    }.toDF(eventTableColumns:_*)
+
+    // spark version 1.4.0 or higher
+    val prop = new java.util.Properties
+    prop.setProperty("user", config.properties("USERNAME"))
+    prop.setProperty("password", config.properties("PASSWORD"))
+    eventDF.write.mode(SaveMode.Append).jdbc(client, tableName, prop)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
new file mode 100644
index 0000000..e95b49b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
@@ -0,0 +1,103 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import scalikejdbc._
+
+/** JDBC related utilities */
+object JDBCUtils {
+  /** Extract JDBC driver type from URL
+    *
+    * @param url JDBC URL
+    * @return The driver type, e.g. postgresql
+    */
+  def driverType(url: String): String = {
+    val capture = """jdbc:([^:]+):""".r
+    capture findFirstIn url match {
+      case Some(capture(driverType)) => driverType
+      case None => ""
+    }
+  }
+
+  /** Determines binary column type from JDBC URL
+    *
+    * @param url JDBC URL
+    * @return Binary column type as SQLSyntax, e.g. LONGBLOB
+    */
+  def binaryColumnType(url: String): SQLSyntax = {
+    driverType(url) match {
+      case "postgresql" => sqls"bytea"
+      case "mysql" => sqls"longblob"
+      case _ => sqls"longblob"
+    }
+  }
+
+  /** Determines UNIX timestamp conversion function from JDBC URL
+    *
+    * @param url JDBC URL
+    * @return Timestamp conversion function, e.g. TO_TIMESTAMP
+    */
+  def timestampFunction(url: String): String = {
+    driverType(url) match {
+      case "postgresql" => "to_timestamp"
+      case "mysql" => "from_unixtime"
+      case _ => "from_unixtime"
+    }
+  }
+
+  /** Converts Map of String to String to comma-separated list of key=value
+    *
+    * @param m Map of String to String
+    * @return Comma-separated list, e.g. FOO=BAR,X=Y,...
+    */
+  def mapToString(m: Map[String, String]): String = {
+    m.map(t => s"${t._1}=${t._2}").mkString(",")
+  }
+
+  /** Inverse of mapToString
+    *
+    * @param str Comma-separated list, e.g. FOO=BAR,X=Y,...
+    * @return Map of String to String, e.g. Map("FOO" -> "BAR", "X" -> "Y", ...)
+    */
+  def stringToMap(str: String): Map[String, String] = {
+    str.split(",").map { x =>
+      val y = x.split("=")
+      y(0) -> y(1)
+    }.toMap[String, String]
+  }
+
+  /** Generate 32-character random ID using UUID with - stripped */
+  def generateId: String = java.util.UUID.randomUUID().toString.replace("-", "")
+
+  /** Prefix a table name
+    *
+    * @param prefix Table prefix
+    * @param table Table name
+    * @return Prefixed table name
+    */
+  def prefixTableName(prefix: String, table: String): SQLSyntax =
+    sqls.createUnsafely(s"${prefix}_$table")
+
+  /** Derive event table name
+    *
+    * @param namespace Namespace of event tables
+    * @param appId App ID
+    * @param channelId Optional channel ID
+    * @return Full event table name
+    */
+  def eventTableName(namespace: String, appId: Int, channelId: Option[Int]): String =
+    s"${namespace}_${appId}${channelId.map("_" + _).getOrElse("")}"
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
new file mode 100644
index 0000000..6015870
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
@@ -0,0 +1,50 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import scalikejdbc._
+
+/** JDBC implementation of [[BaseStorageClient]] */
+class StorageClient(val config: StorageClientConfig)
+  extends BaseStorageClient with Logging {
+  override val prefix = "JDBC"
+
+  if (!config.properties.contains("URL")) {
+    throw new StorageClientException("The URL variable is not set!", null)
+  }
+  if (!config.properties.contains("USERNAME")) {
+    throw new StorageClientException("The USERNAME variable is not set!", null)
+  }
+  if (!config.properties.contains("PASSWORD")) {
+    throw new StorageClientException("The PASSWORD variable is not set!", null)
+  }
+
+  // set max size of connection pool
+  val maxSize: Int = config.properties.getOrElse("CONNECTIONS", "8").toInt
+  val settings = ConnectionPoolSettings(maxSize = maxSize)
+
+  ConnectionPool.singleton(
+    config.properties("URL"),
+    config.properties("USERNAME"),
+    config.properties("PASSWORD"),
+    settings)
+  /** JDBC connection URL. Connections are managed by ScalikeJDBC. */
+  val client = config.properties("URL")
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
new file mode 100644
index 0000000..c423b29
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
@@ -0,0 +1,23 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+/** JDBC implementation of storage traits, supporting meta data, event data, and
+  * model data
+  *
+  * @group Implementation
+  */
+package object jdbc {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
new file mode 100644
index 0000000..82989aa
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
@@ -0,0 +1,59 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.localfs
+
+import java.io.File
+import java.io.FileNotFoundException
+import java.io.FileOutputStream
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+
+import scala.io.Source
+
+class LocalFSModels(f: File, config: StorageClientConfig, prefix: String)
+  extends Models with Logging {
+
+  def insert(i: Model): Unit = {
+    try {
+      val fos = new FileOutputStream(new File(f, s"${prefix}${i.id}"))
+      fos.write(i.models)
+      fos.close
+    } catch {
+      case e: FileNotFoundException => error(e.getMessage)
+    }
+  }
+
+  def get(id: String): Option[Model] = {
+    try {
+      Some(Model(
+        id = id,
+        models = Source.fromFile(new File(f, s"${prefix}${id}"))(
+          scala.io.Codec.ISO8859).map(_.toByte).toArray))
+    } catch {
+      case e: Throwable =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val m = new File(f, s"${prefix}${id}")
+    if (!m.delete) error(s"Unable to delete ${m.getCanonicalPath}!")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala
new file mode 100644
index 0000000..8206384
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala
@@ -0,0 +1,43 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage.localfs
+
+import java.io.File
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+    with Logging {
+  override val prefix = "LocalFS"
+  val f = new File(
+    config.properties.getOrElse("PATH", config.properties("HOSTS")))
+  if (f.exists) {
+    if (!f.isDirectory) throw new StorageClientException(
+      s"${f} already exists but it is not a directory!",
+      null)
+    if (!f.canWrite) throw new StorageClientException(
+      s"${f} already exists but it is not writable!",
+      null)
+  } else {
+    if (!f.mkdirs) throw new StorageClientException(
+      s"${f} does not exist and automatic creation failed!",
+      null)
+  }
+  val client = f
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala
new file mode 100644
index 0000000..f245a06
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala
@@ -0,0 +1,22 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+/** Local file system implementation of storage traits, supporting model data only
+  *
+  * @group Implementation
+  */
+package object localfs {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/package.scala
new file mode 100644
index 0000000..09e6fa3
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/package.scala
@@ -0,0 +1,26 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data
+
+/** If you are an engine developer, please refer to the [[store]] package.
+  *
+  * This package provides convenient access to underlying data access objects.
+  * The common entry point is [[Storage]].
+  *
+  * Developer APIs are available to advanced developers to add support of other
+  * data store backends.
+  */
+package object storage {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/Common.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/Common.scala b/data/src/main/scala/org/apache/predictionio/data/store/Common.scala
new file mode 100644
index 0000000..81b4b28
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/Common.scala
@@ -0,0 +1,50 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.store
+
+import org.apache.predictionio.data.storage.Storage
+import grizzled.slf4j.Logger
+
+private[prediction] object Common {
+
+  @transient lazy val logger = Logger[this.type]
+  @transient lazy private val appsDb = Storage.getMetaDataApps()
+  @transient lazy private val channelsDb = Storage.getMetaDataChannels()
+
+  /* throw exception if invalid app name or channel name */
+  def appNameToId(appName: String, channelName: Option[String]): (Int, Option[Int]) = {
+    val appOpt = appsDb.getByName(appName)
+
+    appOpt.map { app =>
+      val channelMap: Map[String, Int] = channelsDb.getByAppid(app.id)
+        .map(c => (c.name, c.id)).toMap
+
+      val channelId: Option[Int] = channelName.map { ch =>
+        if (channelMap.contains(ch)) {
+          channelMap(ch)
+        } else {
+          logger.error(s"Invalid channel name ${ch}.")
+          throw new IllegalArgumentException(s"Invalid channel name ${ch}.")
+        }
+      }
+
+      (app.id, channelId)
+    }.getOrElse {
+      logger.error(s"Invalid app name ${appName}")
+      throw new IllegalArgumentException(s"Invalid app name ${appName}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
new file mode 100644
index 0000000..ae38e7b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
@@ -0,0 +1,142 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.store
+
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.storage.Event
+
+import org.joda.time.DateTime
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+
+/** This object provides a set of operation to access Event Store
+  * without going through Spark's parallelization
+  */
+object LEventStore {
+
+  private val defaultTimeout = Duration(60, "seconds")
+
+  @transient lazy private val eventsDb = Storage.getLEvents()
+
+  /** Reads events of the specified entity. May use this in Algorithm's predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first (default true)
+    * @return Iterator[Event]
+    */
+  def findByEntity(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None,
+    latest: Boolean = true,
+    timeout: Duration = defaultTimeout): Iterator[Event] = {
+
+    val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+    Await.result(eventsDb.futureFind(
+      appId = appId,
+      channelId = channelId,
+      startTime = startTime,
+      untilTime = untilTime,
+      entityType = Some(entityType),
+      entityId = Some(entityId),
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      limit = limit,
+      reversed = Some(latest)),
+      timeout)
+  }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return Iterator[Event]
+    */
+  def find(
+    appName: String,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None,
+    timeout: Duration = defaultTimeout): Iterator[Event] = {
+
+    val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+    Await.result(eventsDb.futureFind(
+      appId = appId,
+      channelId = channelId,
+      startTime = startTime,
+      untilTime = untilTime,
+      entityType = entityType,
+      entityId = entityId,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      limit = limit), timeout)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala
new file mode 100644
index 0000000..b8f0037
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala
@@ -0,0 +1,116 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.store
+
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PropertyMap
+
+import org.joda.time.DateTime
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+/** This object provides a set of operation to access Event Store
+  * with Spark's parallelization
+  */
+object PEventStore {
+
+  @transient lazy private val eventsDb = Storage.getPEvents()
+
+  /** Read events from Event Store
+    *
+    * @param appName return events of this app
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param sc Spark context
+    * @return RDD[Event]
+    */
+  def find(
+    appName: String,
+    channelName: Option[String] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None
+  )(sc: SparkContext): RDD[Event] = {
+
+    val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+    eventsDb.find(
+      appId = appId,
+      channelId = channelId,
+      startTime = startTime,
+      untilTime = untilTime,
+      entityType = entityType,
+      entityId = entityId,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId
+    )(sc)
+
+  }
+
+  /** Aggregate properties of entities based on these special events:
+    * \$set, \$unset, \$delete events.
+    *
+    * @param appName use events of this app
+    * @param entityType aggregate properties of the entities of this entityType
+    * @param channelName use events of this channel (default channel if it's None)
+    * @param startTime use events with eventTime >= startTime
+    * @param untilTime use events with eventTime < untilTime
+    * @param required only keep entities with these required properties defined
+    * @param sc Spark context
+    * @return RDD[(String, PropertyMap)] RDD of entityId and PropetyMap pair
+    */
+  def aggregateProperties(
+    appName: String,
+    entityType: String,
+    channelName: Option[String] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    required: Option[Seq[String]] = None)
+    (sc: SparkContext): RDD[(String, PropertyMap)] = {
+
+      val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+      eventsDb.aggregateProperties(
+        appId = appId,
+        entityType = entityType,
+        channelId = channelId,
+        startTime = startTime,
+        untilTime = untilTime,
+        required = required
+      )(sc)
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
new file mode 100644
index 0000000..fa14daf
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
@@ -0,0 +1,142 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.store.java
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.LEventStore
+import org.joda.time.DateTime
+
+import scala.collection.JavaConversions
+import scala.concurrent.duration.Duration
+
+/** This Java-friendly object provides a set of operation to access Event Store
+  * without going through Spark's parallelization
+  */
+object LJavaEventStore {
+
+  /** Reads events of the specified entity. May use this in Algorithm's predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first
+    * @return java.util.List[Event]
+    */
+  def findByEntity(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    latest: Boolean,
+    timeout: Duration): java.util.List[Event] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+
+    JavaConversions.seqAsJavaList(
+      LEventStore.findByEntity(
+        appName,
+        entityType,
+        entityId,
+        channelName,
+        eventNamesSeq,
+        targetEntityType,
+        targetEntityId,
+        startTime,
+        untilTime,
+        limitInt,
+        latest,
+        timeout
+      ).toSeq)
+  }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return java.util.List[Event]
+    */
+  def find(
+    appName: String,
+    entityType: Option[String],
+    entityId: Option[String],
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    timeout: Duration): java.util.List[Event] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+
+    JavaConversions.seqAsJavaList(
+      LEventStore.find(
+        appName,
+        entityType,
+        entityId,
+        channelName,
+        eventNamesSeq,
+        targetEntityType,
+        targetEntityId,
+        startTime,
+        untilTime,
+        limitInt,
+        timeout
+      ).toSeq)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala
new file mode 100644
index 0000000..b6d174b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala
@@ -0,0 +1,29 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.store.java
+
+/** Used by Java-based engines to mock Some and None */
+object OptionHelper {
+  /** Mimics a None from Java-based engine */
+  def none[T]: Option[T] = {
+    Option(null.asInstanceOf[T])
+  }
+
+  /** Mimics a Some from Java-based engine */
+  def some[T](value: T): Option[T] = {
+    Some(value)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala
new file mode 100644
index 0000000..c47032c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala
@@ -0,0 +1,109 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.store.java
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PropertyMap
+import org.apache.predictionio.data.store.PEventStore
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaRDD
+import org.joda.time.DateTime
+
+import scala.collection.JavaConversions
+
+/** This Java-friendly object provides a set of operation to access Event Store
+  * with Spark's parallelization
+  */
+object PJavaEventStore {
+
+  /** Read events from Event Store
+    *
+    * @param appName return events of this app
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param sc Spark context
+    * @return JavaRDD[Event]
+    */
+  def find(
+    appName: String,
+    channelName: Option[String],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    entityType: Option[String],
+    entityId: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    sc: SparkContext): JavaRDD[Event] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+
+    PEventStore.find(
+      appName,
+      channelName,
+      startTime,
+      untilTime,
+      entityType,
+      entityId,
+      eventNamesSeq,
+      targetEntityType,
+      targetEntityId
+    )(sc)
+  }
+
+  /** Aggregate properties of entities based on these special events:
+    * \$set, \$unset, \$delete events.
+    *
+    * @param appName use events of this app
+    * @param entityType aggregate properties of the entities of this entityType
+    * @param channelName use events of this channel (default channel if it's None)
+    * @param startTime use events with eventTime >= startTime
+    * @param untilTime use events with eventTime < untilTime
+    * @param required only keep entities with these required properties defined
+    * @param sc Spark context
+    * @return JavaRDD[(String, PropertyMap)] JavaRDD of entityId and PropetyMap pair
+    */
+  def aggregateProperties(
+    appName: String,
+    entityType: String,
+    channelName: Option[String],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    required: Option[java.util.List[String]],
+    sc: SparkContext): JavaRDD[(String, PropertyMap)] = {
+
+    PEventStore.aggregateProperties(
+      appName,
+    entityType,
+    channelName,
+    startTime,
+    untilTime
+    )(sc)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/package.scala b/data/src/main/scala/org/apache/predictionio/data/store/package.scala
new file mode 100644
index 0000000..36c592f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/package.scala
@@ -0,0 +1,21 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data
+
+/** Provides high level interfaces to the Event Store from within a prediction
+  * engine.
+  */
+package object store {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
new file mode 100644
index 0000000..31937d5
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
@@ -0,0 +1,110 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.annotation.Experimental
+import org.apache.predictionio.data.storage.Event
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.SQLContext
+import org.joda.time.DateTime
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe._
+import scala.util.hashing.MurmurHash3
+
+/**
+ * :: Experimental ::
+ */
+@Experimental
+object DataView {
+  /**
+    * :: Experimental ::
+    *
+    * Create a DataFrame from events of a specified app.
+    *
+    * @param appName return events of this app
+    * @param channelName use events of this channel (default channel if it's None)
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param conversionFunction a function that turns raw Events into events of interest.
+    *                           If conversionFunction returns None, such events are dropped.
+    * @param name identify the DataFrame created
+    * @param version used to track changes to the conversionFunction, e.g. version = "20150413"
+    *                and update whenever the function is changed.
+    * @param sqlContext SQL context
+    * @tparam E the output type of the conversion function. The type needs to extend Product
+    *           (e.g. case class)
+    * @return a DataFrame of events
+    */
+  @Experimental
+  def create[E <: Product: TypeTag: ClassTag](
+    appName: String,
+    channelName: Option[String] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    conversionFunction: Event => Option[E],
+    name: String = "",
+    version: String = "")(sqlContext: SQLContext): DataFrame = {
+
+    @transient lazy val logger = Logger[this.type]
+
+    val sc = sqlContext.sparkContext
+
+    val beginTime = startTime match {
+      case Some(t) => t
+      case None => new DateTime(0L)
+    }
+    val endTime = untilTime match {
+      case Some(t) => t
+      case None => DateTime.now() // fix the current time
+    }
+    // detect changes to the case class
+    val uid = java.io.ObjectStreamClass.lookup(implicitly[reflect.ClassTag[E]].runtimeClass)
+        .getSerialVersionUID
+    val hash = MurmurHash3.stringHash(s"$beginTime-$endTime-$version-$uid")
+    val baseDir = s"${sys.env("PIO_FS_BASEDIR")}/view"
+    val fileName = s"$baseDir/$name-$appName-$hash.parquet"
+    try {
+      sqlContext.parquetFile(fileName)
+    } catch {
+      case e: java.io.FileNotFoundException =>
+        logger.info("Cached copy not found, reading from DB.")
+        // if cached copy is found, use it. If not, grab from Storage
+        val result: RDD[E] = PEventStore.find(
+            appName = appName,
+            channelName = channelName,
+            startTime = startTime,
+            untilTime = Some(endTime))(sc)
+          .flatMap((e) => conversionFunction(e))
+        import sqlContext.implicits._ // needed for RDD.toDF()
+        val resultDF = result.toDF()
+
+        resultDF.saveAsParquetFile(fileName)
+        sqlContext.parquetFile(fileName)
+      case e: java.lang.RuntimeException =>
+        if (e.toString.contains("is not a Parquet file")) {
+          logger.error(s"$fileName does not contain a valid Parquet file. " +
+            "Please delete it and try again.")
+        }
+        throw e
+    }
+  }
+}