You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/08 07:45:57 UTC

[2/7] incubator-predictionio git commit: [PIO-49] Add support for Elasticsearch 5

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
new file mode 100644
index 0000000..de74d46
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase.upgrade
+
+import org.apache.predictionio.annotation.Experimental
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.hbase.HBLEvents
+import org.apache.predictionio.data.storage.hbase.HBEventsUtil
+
+import scala.collection.JavaConversions._
+
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import org.apache.predictionio.data.storage.LEvents
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import java.lang.Thread
+
+object CheckDistribution {
+  def entityType(eventClient: LEvents, appId: Int)
+  : Map[(String, Option[String]), Int] = {
+    eventClient
+    .find(appId = appId)
+    .foldLeft(Map[(String, Option[String]), Int]().withDefaultValue(0)) {
+      case (m, e) => {
+        val k = (e.entityType, e.targetEntityType)
+        m.updated(k, m(k) + 1)
+      }
+    }
+  }
+
+  def runMain(appId: Int) {
+    val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+    entityType(eventClient, appId)
+    .toSeq
+    .sortBy(-_._2)
+    .foreach { println }
+
+  }
+
+  def main(args: Array[String]) {
+    runMain(args(0).toInt)
+  }
+
+}
+
+/** :: Experimental :: */
+@Experimental
+object Upgrade_0_8_3 {
+  val NameMap = Map(
+    "pio_user" -> "user",
+    "pio_item" -> "item")
+  val RevNameMap = NameMap.toSeq.map(_.swap).toMap
+
+  val logger = Logger[this.type]
+
+  def main(args: Array[String]) {
+    val fromAppId = args(0).toInt
+    val toAppId = args(1).toInt
+
+    runMain(fromAppId, toAppId)
+  }
+
+  def runMain(fromAppId: Int, toAppId: Int): Unit = {
+    upgrade(fromAppId, toAppId)
+  }
+
+
+  val obsEntityTypes = Set("pio_user", "pio_item")
+  val obsProperties = Set(
+    "pio_itypes", "pio_starttime", "pio_endtime",
+    "pio_inactive", "pio_price", "pio_rating")
+
+  def hasPIOPrefix(eventClient: LEvents, appId: Int): Boolean = {
+    eventClient.find(appId = appId).filter( e =>
+      (obsEntityTypes.contains(e.entityType) ||
+       e.targetEntityType.map(obsEntityTypes.contains(_)).getOrElse(false) ||
+       (!e.properties.keySet.forall(!obsProperties.contains(_)))
+      )
+    ).hasNext
+  }
+
+  def isEmpty(eventClient: LEvents, appId: Int): Boolean =
+    !eventClient.find(appId = appId).hasNext
+
+
+  def upgradeCopy(eventClient: LEvents, fromAppId: Int, toAppId: Int) {
+    val fromDist = CheckDistribution.entityType(eventClient, fromAppId)
+
+    logger.info("FromAppId Distribution")
+    fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+    val events = eventClient
+    .find(appId = fromAppId)
+    .zipWithIndex
+    .foreach { case (fromEvent, index) => {
+      if (index % 50000 == 0) {
+        // logger.info(s"Progress: $fromEvent $index")
+        logger.info(s"Progress: $index")
+      }
+
+
+      val fromEntityType = fromEvent.entityType
+      val toEntityType = NameMap.getOrElse(fromEntityType, fromEntityType)
+
+      val fromTargetEntityType = fromEvent.targetEntityType
+      val toTargetEntityType = fromTargetEntityType
+        .map { et => NameMap.getOrElse(et, et) }
+
+      val toProperties = DataMap(fromEvent.properties.fields.map {
+        case (k, v) =>
+          val newK = if (obsProperties.contains(k)) {
+            val nK = k.stripPrefix("pio_")
+            logger.info(s"property ${k} will be renamed to ${nK}")
+            nK
+          } else k
+          (newK, v)
+      })
+
+      val toEvent = fromEvent.copy(
+        entityType = toEntityType,
+        targetEntityType = toTargetEntityType,
+        properties = toProperties)
+
+      eventClient.insert(toEvent, toAppId)
+    }}
+
+
+    val toDist = CheckDistribution.entityType(eventClient, toAppId)
+
+    logger.info("Recap fromAppId Distribution")
+    fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+    logger.info("ToAppId Distribution")
+    toDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+    val fromGood = fromDist
+      .toSeq
+      .forall { case (k, c) => {
+        val (et, tet) = k
+        val net = NameMap.getOrElse(et, et)
+        val ntet = tet.map(tet => NameMap.getOrElse(tet, tet))
+        val nk = (net, ntet)
+        val nc = toDist.getOrElse(nk, -1)
+        val checkMatch = (c == nc)
+        if (!checkMatch) {
+          logger.info(s"${k} doesn't match: old has ${c}. new has ${nc}.")
+        }
+        checkMatch
+      }}
+
+    val toGood = toDist
+      .toSeq
+      .forall { case (k, c) => {
+        val (et, tet) = k
+        val oet = RevNameMap.getOrElse(et, et)
+        val otet = tet.map(tet => RevNameMap.getOrElse(tet, tet))
+        val ok = (oet, otet)
+        val oc = fromDist.getOrElse(ok, -1)
+        val checkMatch = (c == oc)
+        if (!checkMatch) {
+          logger.info(s"${k} doesn't match: new has ${c}. old has ${oc}.")
+        }
+        checkMatch
+      }}
+
+    if (!fromGood || !toGood) {
+      logger.error("Doesn't match!! There is an import error.")
+    } else {
+      logger.info("Count matches. Looks like we are good to go.")
+    }
+  }
+
+  /* For upgrade from 0.8.2 to 0.8.3 only */
+  def upgrade(fromAppId: Int, toAppId: Int) {
+
+    val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+    require(fromAppId != toAppId,
+      s"FromAppId: $fromAppId must be different from toAppId: $toAppId")
+
+    if (hasPIOPrefix(eventClient, fromAppId)) {
+      require(
+        isEmpty(eventClient, toAppId),
+        s"Target appId: $toAppId is not empty. Please run " +
+        "`pio app data-delete <app_name>` to clean the data before upgrading")
+
+      logger.info(s"$fromAppId isEmpty: " + isEmpty(eventClient, fromAppId))
+
+      upgradeCopy(eventClient, fromAppId, toAppId)
+
+    } else {
+      logger.info(s"From appId: ${fromAppId} doesn't contain"
+        + s" obsolete entityTypes ${obsEntityTypes} or"
+        + s" obsolete properties ${obsProperties}."
+        + " No need data migration."
+        + s" You can continue to use appId ${fromAppId}.")
+    }
+
+    logger.info("Done.")
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
new file mode 100644
index 0000000..b453820
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.hbase.HBPEvents
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+
+// each JValue data associated with the time it is set
+private[predictionio] case class PropTime(val d: JValue, val t: Long) extends Serializable
+
+private[predictionio] case class SetProp (
+  val fields: Map[String, PropTime],
+  // last set time. Note: fields could be empty with valid set time
+  val t: Long) extends Serializable {
+
+  def ++ (that: SetProp): SetProp = {
+    val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+    val common: Map[String, PropTime] = commonKeys.map { k =>
+      val thisData = this.fields(k)
+      val thatData = that.fields(k)
+      // only keep the value with latest time
+      val v = if (thisData.t > thatData.t) thisData else thatData
+      (k, v)
+    }.toMap
+
+    val combinedFields = common ++
+      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+    // keep the latest set time
+    val combinedT = if (this.t > that.t) this.t else that.t
+
+    SetProp(
+      fields = combinedFields,
+      t = combinedT
+    )
+  }
+}
+
+private[predictionio] case class UnsetProp (fields: Map[String, Long]) extends Serializable {
+  def ++ (that: UnsetProp): UnsetProp = {
+    val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+    val common: Map[String, Long] = commonKeys.map { k =>
+      val thisData = this.fields(k)
+      val thatData = that.fields(k)
+      // only keep the value with latest time
+      val v = if (thisData > thatData) thisData else thatData
+      (k, v)
+    }.toMap
+
+    val combinedFields = common ++
+      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+    UnsetProp(
+      fields = combinedFields
+    )
+  }
+}
+
+private[predictionio] case class DeleteEntity (t: Long) extends Serializable {
+  def ++ (that: DeleteEntity): DeleteEntity = {
+    if (this.t > that.t) this else that
+  }
+}
+
+private[predictionio] case class EventOp (
+  val setProp: Option[SetProp] = None,
+  val unsetProp: Option[UnsetProp] = None,
+  val deleteEntity: Option[DeleteEntity] = None
+) extends Serializable {
+
+  def ++ (that: EventOp): EventOp = {
+    EventOp(
+      setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
+      unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
+      deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _)
+    )
+  }
+
+  def toDataMap(): Option[DataMap] = {
+    setProp.flatMap { set =>
+
+      val unsetKeys: Set[String] = unsetProp.map( unset =>
+        unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
+      ).getOrElse(Set())
+
+      val combinedFields = deleteEntity.map { delete =>
+        if (delete.t >= set.t) {
+          None
+        } else {
+          val deleteKeys: Set[String] = set.fields
+            .filter { case (k, PropTime(kv, t)) =>
+              (delete.t >= t)
+            }.keySet
+          Some(set.fields -- unsetKeys -- deleteKeys)
+        }
+      }.getOrElse{
+        Some(set.fields -- unsetKeys)
+      }
+
+      // Note: mapValues() doesn't return concrete Map and causes
+      // NotSerializableException issue. Use map(identity) to work around this.
+      // see https://issues.scala-lang.org/browse/SI-7005
+      combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity)))
+    }
+  }
+
+}
+
+private[predictionio] object EventOp {
+  def apply(e: Event): EventOp = {
+    val t = e.eventTime.getMillis
+    e.event match {
+      case "$set" => {
+        val fields = e.properties.fields.mapValues(jv =>
+          PropTime(jv, t)
+        ).map(identity)
+
+        EventOp(
+          setProp = Some(SetProp(fields = fields, t = t))
+        )
+      }
+      case "$unset" => {
+        val fields = e.properties.fields.mapValues(jv => t).map(identity)
+        EventOp(
+          unsetProp = Some(UnsetProp(fields = fields))
+        )
+      }
+      case "$delete" => {
+        EventOp(
+          deleteEntity = Some(DeleteEntity(t))
+        )
+      }
+      case _ => {
+        EventOp()
+      }
+    }
+  }
+}
+
+@deprecated("Use PEvents or PEventStore instead.", "0.9.2")
+class PBatchView(
+  val appId: Int,
+  val startTime: Option[DateTime],
+  val untilTime: Option[DateTime],
+  val sc: SparkContext) {
+
+  // NOTE: parallel Events DB interface
+  @transient lazy val eventsDb = Storage.getPEvents()
+
+  @transient lazy val _events: RDD[Event] =
+    eventsDb.getByAppIdAndTimeAndEntity(
+      appId = appId,
+      startTime = startTime,
+      untilTime = untilTime,
+      entityType = None,
+      entityId = None)(sc)
+
+  // TODO: change to use EventSeq?
+  @transient lazy val events: RDD[Event] = _events
+
+  def aggregateProperties(
+    entityType: String,
+    startTimeOpt: Option[DateTime] = None,
+    untilTimeOpt: Option[DateTime] = None
+  ): RDD[(String, DataMap)] = {
+
+    _events
+      .filter( e => ((e.entityType == entityType) &&
+        (EventValidation.isSpecialEvents(e.event))) )
+      .map( e => (e.entityId, EventOp(e) ))
+      .aggregateByKey[EventOp](EventOp())(
+        // within same partition
+        seqOp = { case (u, v) => u ++ v },
+        // across partition
+        combOp = { case (accu, u) => accu ++ u }
+      )
+      .mapValues(_.toDataMap)
+      .filter{ case (k, v) => v.isDefined }
+      .map{ case (k, v) => (k, v.get) }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/resources/application.conf b/storage/hbase/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/hbase/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+  sources {
+    mongodb {
+      type = mongodb
+      hosts = [localhost]
+      ports = [27017]
+    }
+    elasticsearch {
+      type = elasticsearch
+      hosts = [localhost]
+      ports = [9300]
+    }
+  }
+  repositories {
+    # This section is dummy just to make storage happy.
+    # The actual testing will not bypass these repository settings completely.
+    # Please refer to StorageTestUtils.scala.
+    settings {
+      name = "test_predictionio"
+      source = mongodb
+    }
+
+    appdata {
+      name = "test_predictionio_appdata"
+      source = mongodb
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/.gitignore
----------------------------------------------------------------------
diff --git a/storage/hdfs/.gitignore b/storage/hdfs/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/hdfs/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/build.sbt
----------------------------------------------------------------------
diff --git a/storage/hdfs/build.sbt b/storage/hdfs/build.sbt
new file mode 100644
index 0000000..9f064c6
--- /dev/null
+++ b/storage/hdfs/build.sbt
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-hdfs"
+
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+  "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+  "org.scalatest"           %% "scalatest"      % "2.1.7" % "test",
+  "org.specs2"              %% "specs2"         % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+  case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+  case PathList("META-INF", "NOTICE.txt")  => MergeStrategy.concat
+  case x =>
+    val oldStrategy = (assemblyMergeStrategy in assembly).value
+    oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-hdfs-assembly-" + version.value + ".jar")
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
new file mode 100644
index 0000000..08dfb01
--- /dev/null
+++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hdfs
+
+import java.io.IOException
+
+import com.google.common.io.ByteStreams
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+class HDFSModels(fs: FileSystem, config: StorageClientConfig, prefix: String)
+  extends Models with Logging {
+
+  def insert(i: Model): Unit = {
+    try {
+      val fsdos = fs.create(new Path(s"$prefix${i.id}"))
+      fsdos.write(i.models)
+      fsdos.close
+    } catch {
+      case e: IOException => error(e.getMessage)
+    }
+  }
+
+  def get(id: String): Option[Model] = {
+    try {
+      val p = new Path(s"$prefix$id")
+      Some(Model(
+        id = id,
+        models = ByteStreams.toByteArray(fs.open(p))))
+    } catch {
+      case e: Throwable =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val p = new Path(s"$prefix$id")
+    if (!fs.delete(p, false)) {
+      error(s"Unable to delete ${fs.makeQualified(p).toString}!")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
new file mode 100644
index 0000000..bc57f2a
--- /dev/null
+++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hdfs
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+    with Logging {
+  override val prefix = "HDFS"
+  val conf = new Configuration
+  val fs = FileSystem.get(conf)
+  fs.setWorkingDirectory(
+    new Path(config.properties.getOrElse("PATH", config.properties("HOSTS"))))
+  val client = fs
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
new file mode 100644
index 0000000..a927d78
--- /dev/null
+++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** HDFS implementation of storage traits, supporting model data only
+  *
+  * @group Implementation
+  */
+package object hdfs {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/test/resources/application.conf b/storage/hdfs/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/hdfs/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+  sources {
+    mongodb {
+      type = mongodb
+      hosts = [localhost]
+      ports = [27017]
+    }
+    elasticsearch {
+      type = elasticsearch
+      hosts = [localhost]
+      ports = [9300]
+    }
+  }
+  repositories {
+    # This section is dummy just to make storage happy.
+    # The actual testing will not bypass these repository settings completely.
+    # Please refer to StorageTestUtils.scala.
+    settings {
+      name = "test_predictionio"
+      source = mongodb
+    }
+
+    appdata {
+      name = "test_predictionio_appdata"
+      source = mongodb
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/.gitignore
----------------------------------------------------------------------
diff --git a/storage/jdbc/.gitignore b/storage/jdbc/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/jdbc/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/build.sbt
----------------------------------------------------------------------
diff --git a/storage/jdbc/build.sbt b/storage/jdbc/build.sbt
new file mode 100644
index 0000000..63d420b
--- /dev/null
+++ b/storage/jdbc/build.sbt
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-jdbc"
+
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+  "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+  "org.apache.spark"        %% "spark-sql"      % sparkVersion.value % "provided",
+  "org.postgresql"           % "postgresql"     % "9.4-1204-jdbc41",
+  "org.scalikejdbc"         %% "scalikejdbc"    % "2.3.5",
+  "org.scalatest"           %% "scalatest"      % "2.1.7" % "test",
+  "org.specs2"              %% "specs2"         % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+  case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+  case PathList("META-INF", "NOTICE.txt")  => MergeStrategy.concat
+  case x =>
+    val oldStrategy = (assemblyMergeStrategy in assembly).value
+    oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-jdbc-assembly-" + version.value + ".jar")
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
new file mode 100644
index 0000000..437f8ae
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+import scala.util.Random
+
+/** JDBC implementation of [[AccessKeys]] */
+class JDBCAccessKeys(client: String, config: StorageClientConfig, prefix: String)
+  extends AccessKeys with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "accesskeys")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      accesskey varchar(64) not null primary key,
+      appid integer not null,
+      events text)""".execute().apply()
+  }
+
+  def insert(accessKey: AccessKey): Option[String] = DB localTx { implicit s =>
+    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+    val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(","))
+    sql"""
+    insert into $tableName values(
+      $key,
+      ${accessKey.appid},
+      $events)""".update().apply()
+    Some(key)
+  }
+
+  def get(key: String): Option[AccessKey] = DB readOnly { implicit session =>
+    sql"SELECT accesskey, appid, events FROM $tableName WHERE accesskey = $key".
+      map(resultToAccessKey).single().apply()
+  }
+
+  def getAll(): Seq[AccessKey] = DB readOnly { implicit session =>
+    sql"SELECT accesskey, appid, events FROM $tableName".map(resultToAccessKey).list().apply()
+  }
+
+  def getByAppid(appid: Int): Seq[AccessKey] = DB readOnly { implicit session =>
+    sql"SELECT accesskey, appid, events FROM $tableName WHERE appid = $appid".
+      map(resultToAccessKey).list().apply()
+  }
+
+  def update(accessKey: AccessKey): Unit = DB localTx { implicit session =>
+    val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(","))
+    sql"""
+    UPDATE $tableName SET
+      appid = ${accessKey.appid},
+      events = $events
+    WHERE accesskey = ${accessKey.key}""".update().apply()
+  }
+
+  def delete(key: String): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE accesskey = $key".update().apply()
+  }
+
+  /** Convert JDBC results to [[AccessKey]] */
+  def resultToAccessKey(rs: WrappedResultSet): AccessKey = {
+    AccessKey(
+      key = rs.string("accesskey"),
+      appid = rs.int("appid"),
+      events = rs.stringOpt("events").map(_.split(",").toSeq).getOrElse(Nil))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
new file mode 100644
index 0000000..17e6410
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Apps]] */
+class JDBCApps(client: String, config: StorageClientConfig, prefix: String)
+  extends Apps with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "apps")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id serial not null primary key,
+      name text not null,
+      description text)""".execute.apply()
+  }
+
+  def insert(app: App): Option[Int] = DB localTx { implicit session =>
+    val q = if (app.id == 0) {
+      sql"""
+      insert into $tableName (name, description) values(${app.name}, ${app.description})
+      """
+    } else {
+      sql"""
+      insert into $tableName values(${app.id}, ${app.name}, ${app.description})
+      """
+    }
+    Some(q.updateAndReturnGeneratedKey().apply().toInt)
+  }
+
+  def get(id: Int): Option[App] = DB readOnly { implicit session =>
+    sql"SELECT id, name, description FROM $tableName WHERE id = ${id}".map(rs =>
+      App(
+        id = rs.int("id"),
+        name = rs.string("name"),
+        description = rs.stringOpt("description"))
+    ).single().apply()
+  }
+
+  def getByName(name: String): Option[App] = DB readOnly { implicit session =>
+    sql"SELECT id, name, description FROM $tableName WHERE name = ${name}".map(rs =>
+      App(
+        id = rs.int("id"),
+        name = rs.string("name"),
+        description = rs.stringOpt("description"))
+    ).single().apply()
+  }
+
+  def getAll(): Seq[App] = DB readOnly { implicit session =>
+    sql"SELECT id, name, description FROM $tableName".map(rs =>
+      App(
+        id = rs.int("id"),
+        name = rs.string("name"),
+        description = rs.stringOpt("description"))
+    ).list().apply()
+  }
+
+  def update(app: App): Unit = DB localTx { implicit session =>
+    sql"""
+    update $tableName set name = ${app.name}, description = ${app.description}
+    where id = ${app.id}""".update().apply()
+  }
+
+  def delete(id: Int): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
new file mode 100644
index 0000000..c9aaca5
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Channels]] */
+class JDBCChannels(client: String, config: StorageClientConfig, prefix: String)
+  extends Channels with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "channels")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id serial not null primary key,
+      name text not null,
+      appid integer not null)""".execute().apply()
+  }
+
+  def insert(channel: Channel): Option[Int] = DB localTx { implicit session =>
+    val q = if (channel.id == 0) {
+      sql"INSERT INTO $tableName (name, appid) VALUES(${channel.name}, ${channel.appid})"
+    } else {
+      sql"INSERT INTO $tableName VALUES(${channel.id}, ${channel.name}, ${channel.appid})"
+    }
+    Some(q.updateAndReturnGeneratedKey().apply().toInt)
+  }
+
+  def get(id: Int): Option[Channel] = DB localTx { implicit session =>
+    sql"SELECT id, name, appid FROM $tableName WHERE id = $id".
+      map(resultToChannel).single().apply()
+  }
+
+  def getByAppid(appid: Int): Seq[Channel] = DB localTx { implicit session =>
+    sql"SELECT id, name, appid FROM $tableName WHERE appid = $appid".
+      map(resultToChannel).list().apply()
+  }
+
+  def delete(id: Int): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+
+  def resultToChannel(rs: WrappedResultSet): Channel = {
+    Channel(
+      id = rs.int("id"),
+      name = rs.string("name"),
+      appid = rs.int("appid"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
new file mode 100644
index 0000000..13c374d
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[EngineInstances]] */
+class JDBCEngineInstances(client: String, config: StorageClientConfig, prefix: String)
+  extends EngineInstances with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "engineinstances")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id varchar(100) not null primary key,
+      status text not null,
+      startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      engineId text not null,
+      engineVersion text not null,
+      engineVariant text not null,
+      engineFactory text not null,
+      batch text not null,
+      env text not null,
+      sparkConf text not null,
+      datasourceParams text not null,
+      preparatorParams text not null,
+      algorithmsParams text not null,
+      servingParams text not null)""".execute().apply()
+  }
+
+  def insert(i: EngineInstance): String = DB localTx { implicit session =>
+    val id = java.util.UUID.randomUUID().toString
+    sql"""
+    INSERT INTO $tableName VALUES(
+      $id,
+      ${i.status},
+      ${i.startTime},
+      ${i.endTime},
+      ${i.engineId},
+      ${i.engineVersion},
+      ${i.engineVariant},
+      ${i.engineFactory},
+      ${i.batch},
+      ${JDBCUtils.mapToString(i.env)},
+      ${JDBCUtils.mapToString(i.sparkConf)},
+      ${i.dataSourceParams},
+      ${i.preparatorParams},
+      ${i.algorithmsParams},
+      ${i.servingParams})""".update().apply()
+    id
+  }
+
+  def get(id: String): Option[EngineInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      engineId,
+      engineVersion,
+      engineVariant,
+      engineFactory,
+      batch,
+      env,
+      sparkConf,
+      datasourceParams,
+      preparatorParams,
+      algorithmsParams,
+      servingParams
+    FROM $tableName WHERE id = $id""".map(resultToEngineInstance).
+      single().apply()
+  }
+
+  def getAll(): Seq[EngineInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      engineId,
+      engineVersion,
+      engineVariant,
+      engineFactory,
+      batch,
+      env,
+      sparkConf,
+      datasourceParams,
+      preparatorParams,
+      algorithmsParams,
+      servingParams
+    FROM $tableName""".map(resultToEngineInstance).list().apply()
+  }
+
+  def getLatestCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Option[EngineInstance] =
+    getCompleted(engineId, engineVersion, engineVariant).headOption
+
+  def getCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Seq[EngineInstance] = DB localTx { implicit s =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      engineId,
+      engineVersion,
+      engineVariant,
+      engineFactory,
+      batch,
+      env,
+      sparkConf,
+      datasourceParams,
+      preparatorParams,
+      algorithmsParams,
+      servingParams
+    FROM $tableName
+    WHERE
+      status = 'COMPLETED' AND
+      engineId = $engineId AND
+      engineVersion = $engineVersion AND
+      engineVariant = $engineVariant
+    ORDER BY startTime DESC""".
+      map(resultToEngineInstance).list().apply()
+  }
+
+  def update(i: EngineInstance): Unit = DB localTx { implicit session =>
+    sql"""
+    update $tableName set
+      status = ${i.status},
+      startTime = ${i.startTime},
+      endTime = ${i.endTime},
+      engineId = ${i.engineId},
+      engineVersion = ${i.engineVersion},
+      engineVariant = ${i.engineVariant},
+      engineFactory = ${i.engineFactory},
+      batch = ${i.batch},
+      env = ${JDBCUtils.mapToString(i.env)},
+      sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+      datasourceParams = ${i.dataSourceParams},
+      preparatorParams = ${i.preparatorParams},
+      algorithmsParams = ${i.algorithmsParams},
+      servingParams = ${i.servingParams}
+    where id = ${i.id}""".update().apply()
+  }
+
+  def delete(id: String): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+
+  /** Convert JDBC results to [[EngineInstance]] */
+  def resultToEngineInstance(rs: WrappedResultSet): EngineInstance = {
+    EngineInstance(
+      id = rs.string("id"),
+      status = rs.string("status"),
+      startTime = rs.jodaDateTime("startTime"),
+      endTime = rs.jodaDateTime("endTime"),
+      engineId = rs.string("engineId"),
+      engineVersion = rs.string("engineVersion"),
+      engineVariant = rs.string("engineVariant"),
+      engineFactory = rs.string("engineFactory"),
+      batch = rs.string("batch"),
+      env = JDBCUtils.stringToMap(rs.string("env")),
+      sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+      dataSourceParams = rs.string("datasourceParams"),
+      preparatorParams = rs.string("preparatorParams"),
+      algorithmsParams = rs.string("algorithmsParams"),
+      servingParams = rs.string("servingParams"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
new file mode 100644
index 0000000..90eb5f3
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementations of [[EvaluationInstances]] */
+class JDBCEvaluationInstances(client: String, config: StorageClientConfig, prefix: String)
+  extends EvaluationInstances with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "evaluationinstances")
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id varchar(100) not null primary key,
+      status text not null,
+      startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+      evaluationClass text not null,
+      engineParamsGeneratorClass text not null,
+      batch text not null,
+      env text not null,
+      sparkConf text not null,
+      evaluatorResults text not null,
+      evaluatorResultsHTML text not null,
+      evaluatorResultsJSON text)""".execute().apply()
+  }
+
+  def insert(i: EvaluationInstance): String = DB localTx { implicit session =>
+    val id = java.util.UUID.randomUUID().toString
+    sql"""
+    INSERT INTO $tableName VALUES(
+      $id,
+      ${i.status},
+      ${i.startTime},
+      ${i.endTime},
+      ${i.evaluationClass},
+      ${i.engineParamsGeneratorClass},
+      ${i.batch},
+      ${JDBCUtils.mapToString(i.env)},
+      ${JDBCUtils.mapToString(i.sparkConf)},
+      ${i.evaluatorResults},
+      ${i.evaluatorResultsHTML},
+      ${i.evaluatorResultsJSON})""".update().apply()
+    id
+  }
+
+  def get(id: String): Option[EvaluationInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      evaluationClass,
+      engineParamsGeneratorClass,
+      batch,
+      env,
+      sparkConf,
+      evaluatorResults,
+      evaluatorResultsHTML,
+      evaluatorResultsJSON
+    FROM $tableName WHERE id = $id
+    """.map(resultToEvaluationInstance).single().apply()
+  }
+
+  def getAll(): Seq[EvaluationInstance] = DB localTx { implicit session =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      evaluationClass,
+      engineParamsGeneratorClass,
+      batch,
+      env,
+      sparkConf,
+      evaluatorResults,
+      evaluatorResultsHTML,
+      evaluatorResultsJSON
+    FROM $tableName
+    """.map(resultToEvaluationInstance).list().apply()
+  }
+
+  def getCompleted(): Seq[EvaluationInstance] = DB localTx { implicit s =>
+    sql"""
+    SELECT
+      id,
+      status,
+      startTime,
+      endTime,
+      evaluationClass,
+      engineParamsGeneratorClass,
+      batch,
+      env,
+      sparkConf,
+      evaluatorResults,
+      evaluatorResultsHTML,
+      evaluatorResultsJSON
+    FROM $tableName
+    WHERE
+      status = 'EVALCOMPLETED'
+    ORDER BY starttime DESC
+    """.map(resultToEvaluationInstance).list().apply()
+  }
+
+  def update(i: EvaluationInstance): Unit = DB localTx { implicit session =>
+    sql"""
+    update $tableName set
+      status = ${i.status},
+      startTime = ${i.startTime},
+      endTime = ${i.endTime},
+      evaluationClass = ${i.evaluationClass},
+      engineParamsGeneratorClass = ${i.engineParamsGeneratorClass},
+      batch = ${i.batch},
+      env = ${JDBCUtils.mapToString(i.env)},
+      sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+      evaluatorResults = ${i.evaluatorResults},
+      evaluatorResultsHTML = ${i.evaluatorResultsHTML},
+      evaluatorResultsJSON = ${i.evaluatorResultsJSON}
+    where id = ${i.id}""".update().apply()
+  }
+
+  def delete(id: String): Unit = DB localTx { implicit session =>
+    sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+  }
+
+  /** Convert JDBC results to [[EvaluationInstance]] */
+  def resultToEvaluationInstance(rs: WrappedResultSet): EvaluationInstance = {
+    EvaluationInstance(
+      id = rs.string("id"),
+      status = rs.string("status"),
+      startTime = rs.jodaDateTime("startTime"),
+      endTime = rs.jodaDateTime("endTime"),
+      evaluationClass = rs.string("evaluationClass"),
+      engineParamsGeneratorClass = rs.string("engineParamsGeneratorClass"),
+      batch = rs.string("batch"),
+      env = JDBCUtils.stringToMap(rs.string("env")),
+      sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+      evaluatorResults = rs.string("evaluatorResults"),
+      evaluatorResultsHTML = rs.string("evaluatorResultsHTML"),
+      evaluatorResultsJSON = rs.string("evaluatorResultsJSON"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
new file mode 100644
index 0000000..dddef67
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s.JObject
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import scalikejdbc._
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+/** JDBC implementation of [[LEvents]] */
+class JDBCLEvents(
+    client: String,
+    config: StorageClientConfig,
+    namespace: String) extends LEvents with Logging {
+  implicit private val formats = org.json4s.DefaultFormats
+
+  def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+
+    // To use index, it must be varchar less than 255 characters on a VARCHAR column
+    val useIndex = config.properties.contains("INDEX") &&
+      config.properties("INDEX").equalsIgnoreCase("enabled")
+
+    val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+    val entityIdIndexName = s"idx_${tableName}_ei"
+    val entityTypeIndexName = s"idx_${tableName}_et"
+    DB autoCommit { implicit session =>
+      if (useIndex) {
+        SQL(s"""
+      create table if not exists $tableName (
+        id varchar(32) not null primary key,
+        event varchar(255) not null,
+        entityType varchar(255) not null,
+        entityId varchar(255) not null,
+        targetEntityType text,
+        targetEntityId text,
+        properties text,
+        eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        eventTimeZone varchar(50) not null,
+        tags text,
+        prId text,
+        creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        creationTimeZone varchar(50) not null)""").execute().apply()
+
+        // create index
+        SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply()
+        SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply()
+      } else {
+        SQL(s"""
+      create table if not exists $tableName (
+        id varchar(32) not null primary key,
+        event text not null,
+        entityType text not null,
+        entityId text not null,
+        targetEntityType text,
+        targetEntityId text,
+        properties text,
+        eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        eventTimeZone varchar(50) not null,
+        tags text,
+        prId text,
+        creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+        creationTimeZone varchar(50) not null)""").execute().apply()
+      }
+      true
+    }
+  }
+
+  def remove(appId: Int, channelId: Option[Int] = None): Boolean =
+    DB autoCommit { implicit session =>
+      SQL(s"""
+      drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+      """).execute().apply()
+      true
+    }
+
+  def close(): Unit = ConnectionPool.closeAll()
+
+  def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
+    implicit ec: ExecutionContext): Future[String] = Future {
+    DB localTx { implicit session =>
+      val id = event.eventId.getOrElse(JDBCUtils.generateId)
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      sql"""
+      insert into $tableName values(
+        $id,
+        ${event.event},
+        ${event.entityType},
+        ${event.entityId},
+        ${event.targetEntityType},
+        ${event.targetEntityId},
+        ${write(event.properties.toJObject)},
+        ${event.eventTime},
+        ${event.eventTime.getZone.getID},
+        ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None},
+        ${event.prId},
+        ${event.creationTime},
+        ${event.creationTime.getZone.getID}
+      )
+      """.update().apply()
+      id
+    }
+  }
+
+  def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
+    implicit ec: ExecutionContext): Future[Option[Event]] = Future {
+    DB readOnly { implicit session =>
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      sql"""
+      select
+        id,
+        event,
+        entityType,
+        entityId,
+        targetEntityType,
+        targetEntityId,
+        properties,
+        eventTime,
+        eventTimeZone,
+        tags,
+        prId,
+        creationTime,
+        creationTimeZone
+      from $tableName
+      where id = $eventId
+      """.map(resultToEvent).single().apply()
+    }
+  }
+
+  def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
+    implicit ec: ExecutionContext): Future[Boolean] = Future {
+    DB localTx { implicit session =>
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      sql"""
+      delete from $tableName where id = $eventId
+      """.update().apply()
+      true
+    }
+  }
+
+  def futureFind(
+      appId: Int,
+      channelId: Option[Int] = None,
+      startTime: Option[DateTime] = None,
+      untilTime: Option[DateTime] = None,
+      entityType: Option[String] = None,
+      entityId: Option[String] = None,
+      eventNames: Option[Seq[String]] = None,
+      targetEntityType: Option[Option[String]] = None,
+      targetEntityId: Option[Option[String]] = None,
+      limit: Option[Int] = None,
+      reversed: Option[Boolean] = None
+    )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future {
+    DB readOnly { implicit session =>
+      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+      val whereClause = sqls.toAndConditionOpt(
+        startTime.map(x => sqls"eventTime >= $x"),
+        untilTime.map(x => sqls"eventTime < $x"),
+        entityType.map(x => sqls"entityType = $x"),
+        entityId.map(x => sqls"entityId = $x"),
+        eventNames.map(x =>
+          sqls.toOrConditionOpt(x.map(y =>
+            Some(sqls"event = $y")
+          ): _*)
+        ).getOrElse(None),
+        targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y")
+            .getOrElse(sqls"targetEntityType IS NULL")),
+        targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y")
+            .getOrElse(sqls"targetEntityId IS NULL"))
+      ).map(sqls.where(_)).getOrElse(sqls"")
+      val orderByClause = reversed.map(x =>
+        if (x) sqls"eventTime desc" else sqls"eventTime asc"
+      ).getOrElse(sqls"eventTime asc")
+      val limitClause = limit.map(x =>
+        if (x < 0) sqls"" else sqls.limit(x)
+      ).getOrElse(sqls"")
+      val q = sql"""
+      select
+        id,
+        event,
+        entityType,
+        entityId,
+        targetEntityType,
+        targetEntityId,
+        properties,
+        eventTime,
+        eventTimeZone,
+        tags,
+        prId,
+        creationTime,
+        creationTimeZone
+      from $tableName
+      $whereClause
+      order by $orderByClause
+      $limitClause
+      """
+      q.map(resultToEvent).list().apply().toIterator
+    }
+  }
+
+  private[predictionio] def resultToEvent(rs: WrappedResultSet): Event = {
+    Event(
+      eventId = rs.stringOpt("id"),
+      event = rs.string("event"),
+      entityType = rs.string("entityType"),
+      entityId = rs.string("entityId"),
+      targetEntityType = rs.stringOpt("targetEntityType"),
+      targetEntityId = rs.stringOpt("targetEntityId"),
+      properties = rs.stringOpt("properties").map(p =>
+        DataMap(read[JObject](p))).getOrElse(DataMap()),
+      eventTime = new DateTime(rs.jodaDateTime("eventTime"),
+        DateTimeZone.forID(rs.string("eventTimeZone"))),
+      tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil),
+      prId = rs.stringOpt("prId"),
+      creationTime = new DateTime(rs.jodaDateTime("creationTime"),
+        DateTimeZone.forID(rs.string("creationTimeZone")))
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
new file mode 100644
index 0000000..b48502a
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Models]] */
+class JDBCModels(client: String, config: StorageClientConfig, prefix: String)
+  extends Models with Logging {
+  /** Database table name for this data access object */
+  val tableName = JDBCUtils.prefixTableName(prefix, "models")
+
+  /** Determines binary column type based on JDBC driver type */
+  val binaryColumnType = JDBCUtils.binaryColumnType(client)
+  DB autoCommit { implicit session =>
+    sql"""
+    create table if not exists $tableName (
+      id varchar(100) not null primary key,
+      models $binaryColumnType not null)""".execute().apply()
+  }
+
+  def insert(i: Model): Unit = DB localTx { implicit session =>
+    sql"insert into $tableName values(${i.id}, ${i.models})".update().apply()
+  }
+
+  def get(id: String): Option[Model] = DB readOnly { implicit session =>
+    sql"select id, models from $tableName where id = $id".map { r =>
+      Model(id = r.string("id"), models = r.bytes("models"))
+    }.single().apply()
+  }
+
+  def delete(id: String): Unit = DB localTx { implicit session =>
+    sql"delete from $tableName where id = $id".execute().apply()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
new file mode 100644
index 0000000..2e6ee83
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import java.sql.{DriverManager, ResultSet}
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{JdbcRDD, RDD}
+import org.apache.spark.sql.{SQLContext, SaveMode}
+import org.json4s.JObject
+import org.json4s.native.Serialization
+import scalikejdbc._
+
+/** JDBC implementation of [[PEvents]] */
+class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String) extends PEvents {
+  @transient private implicit lazy val formats = org.json4s.DefaultFormats
+  def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = {
+    val lower = startTime.map(_.getMillis).getOrElse(0.toLong)
+    /** Change the default upper bound from +100 to +1 year because MySQL's
+      * FROM_UNIXTIME(t) will return NULL if we use +100 years.
+      */
+    val upper = untilTime.map(_.getMillis).getOrElse((DateTime.now + 1.years).getMillis)
+    val par = scala.math.min(
+      new Duration(upper - lower).getStandardDays,
+      config.properties.getOrElse("PARTITIONS", "4").toLong).toInt
+    val entityTypeClause = entityType.map(x => s"and entityType = '$x'").getOrElse("")
+    val entityIdClause = entityId.map(x => s"and entityId = '$x'").getOrElse("")
+    val eventNamesClause =
+      eventNames.map("and (" + _.map(y => s"event = '$y'").mkString(" or ") + ")").getOrElse("")
+    val targetEntityTypeClause = targetEntityType.map(
+      _.map(x => s"and targetEntityType = '$x'"
+    ).getOrElse("and targetEntityType is null")).getOrElse("")
+    val targetEntityIdClause = targetEntityId.map(
+      _.map(x => s"and targetEntityId = '$x'"
+    ).getOrElse("and targetEntityId is null")).getOrElse("")
+    val q = s"""
+      select
+        id,
+        event,
+        entityType,
+        entityId,
+        targetEntityType,
+        targetEntityId,
+        properties,
+        eventTime,
+        eventTimeZone,
+        tags,
+        prId,
+        creationTime,
+        creationTimeZone
+      from ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+      where
+        eventTime >= ${JDBCUtils.timestampFunction(client)}(?) and
+        eventTime < ${JDBCUtils.timestampFunction(client)}(?)
+      $entityTypeClause
+      $entityIdClause
+      $eventNamesClause
+      $targetEntityTypeClause
+      $targetEntityIdClause
+      """.replace("\n", " ")
+    new JdbcRDD(
+      sc,
+      () => {
+        DriverManager.getConnection(
+          client,
+          config.properties("USERNAME"),
+          config.properties("PASSWORD"))
+      },
+      q,
+      lower / 1000,
+      upper / 1000,
+      par,
+      (r: ResultSet) => {
+        Event(
+          eventId = Option(r.getString("id")),
+          event = r.getString("event"),
+          entityType = r.getString("entityType"),
+          entityId = r.getString("entityId"),
+          targetEntityType = Option(r.getString("targetEntityType")),
+          targetEntityId = Option(r.getString("targetEntityId")),
+          properties = Option(r.getString("properties")).map(x =>
+            DataMap(Serialization.read[JObject](x))).getOrElse(DataMap()),
+          eventTime = new DateTime(r.getTimestamp("eventTime").getTime,
+            DateTimeZone.forID(r.getString("eventTimeZone"))),
+          tags = Option(r.getString("tags")).map(x =>
+            x.split(",").toList).getOrElse(Nil),
+          prId = Option(r.getString("prId")),
+          creationTime = new DateTime(r.getTimestamp("creationTime").getTime,
+            DateTimeZone.forID(r.getString("creationTimeZone"))))
+      }).cache()
+  }
+
+  def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val sqlContext = new SQLContext(sc)
+
+    import sqlContext.implicits._
+
+    val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+
+    val eventTableColumns = Seq[String](
+        "id"
+      , "event"
+      , "entityType"
+      , "entityId"
+      , "targetEntityType"
+      , "targetEntityId"
+      , "properties"
+      , "eventTime"
+      , "eventTimeZone"
+      , "tags"
+      , "prId"
+      , "creationTime"
+      , "creationTimeZone")
+
+    val eventDF = events.map(x =>
+                              Event(eventId = None, event = x.event, entityType = x.entityType,
+                              entityId = x.entityId, targetEntityType = x.targetEntityType,
+                              targetEntityId = x.targetEntityId, properties = x.properties,
+                              eventTime = x.eventTime, tags = x.tags, prId= x.prId,
+                              creationTime = x.eventTime)
+                            )
+                        .map { event =>
+      (event.eventId.getOrElse(JDBCUtils.generateId)
+        , event.event
+        , event.entityType
+        , event.entityId
+        , event.targetEntityType.orNull
+        , event.targetEntityId.orNull
+        , if (!event.properties.isEmpty) Serialization.write(event.properties.toJObject) else null
+        , new java.sql.Timestamp(event.eventTime.getMillis)
+        , event.eventTime.getZone.getID
+        , if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else null
+        , event.prId
+        , new java.sql.Timestamp(event.creationTime.getMillis)
+        , event.creationTime.getZone.getID)
+    }.toDF(eventTableColumns:_*)
+
+    // spark version 1.4.0 or higher
+    val prop = new java.util.Properties
+    prop.setProperty("user", config.properties("USERNAME"))
+    prop.setProperty("password", config.properties("PASSWORD"))
+    eventDF.write.mode(SaveMode.Append).jdbc(client, tableName, prop)
+  }
+
+  def delete(eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+
+    eventIds.foreachPartition{ iter =>
+
+      iter.foreach { eventId =>
+        DB localTx { implicit session =>
+        val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+        val table = SQLSyntax.createUnsafely(tableName)
+        sql"""
+        delete from $table where id = $eventId
+        """.update().apply()
+        true
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
new file mode 100644
index 0000000..3eb55ba
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import scalikejdbc._
+
+/** JDBC related utilities */
+object JDBCUtils {
+  /** Extract JDBC driver type from URL
+    *
+    * @param url JDBC URL
+    * @return The driver type, e.g. postgresql
+    */
+  def driverType(url: String): String = {
+    val capture = """jdbc:([^:]+):""".r
+    capture findFirstIn url match {
+      case Some(capture(driverType)) => driverType
+      case None => ""
+    }
+  }
+
+  /** Determines binary column type from JDBC URL
+    *
+    * @param url JDBC URL
+    * @return Binary column type as SQLSyntax, e.g. LONGBLOB
+    */
+  def binaryColumnType(url: String): SQLSyntax = {
+    driverType(url) match {
+      case "postgresql" => sqls"bytea"
+      case "mysql" => sqls"longblob"
+      case _ => sqls"longblob"
+    }
+  }
+
+  /** Determines UNIX timestamp conversion function from JDBC URL
+    *
+    * @param url JDBC URL
+    * @return Timestamp conversion function, e.g. TO_TIMESTAMP
+    */
+  def timestampFunction(url: String): String = {
+    driverType(url) match {
+      case "postgresql" => "to_timestamp"
+      case "mysql" => "from_unixtime"
+      case _ => "from_unixtime"
+    }
+  }
+
+  /** Converts Map of String to String to comma-separated list of key=value
+    *
+    * @param m Map of String to String
+    * @return Comma-separated list, e.g. FOO=BAR,X=Y,...
+    */
+  def mapToString(m: Map[String, String]): String = {
+    m.map(t => s"${t._1}=${t._2}").mkString(",")
+  }
+
+  /** Inverse of mapToString
+    *
+    * @param str Comma-separated list, e.g. FOO=BAR,X=Y,...
+    * @return Map of String to String, e.g. Map("FOO" -> "BAR", "X" -> "Y", ...)
+    */
+  def stringToMap(str: String): Map[String, String] = {
+    str.split(",").map { x =>
+      val y = x.split("=")
+      y(0) -> y(1)
+    }.toMap[String, String]
+  }
+
+  /** Generate 32-character random ID using UUID with - stripped */
+  def generateId: String = java.util.UUID.randomUUID().toString.replace("-", "")
+
+  /** Prefix a table name
+    *
+    * @param prefix Table prefix
+    * @param table Table name
+    * @return Prefixed table name
+    */
+  def prefixTableName(prefix: String, table: String): SQLSyntax =
+    sqls.createUnsafely(s"${prefix}_$table")
+
+  /** Derive event table name
+    *
+    * @param namespace Namespace of event tables
+    * @param appId App ID
+    * @param channelId Optional channel ID
+    * @return Full event table name
+    */
+  def eventTableName(namespace: String, appId: Int, channelId: Option[Int]): String =
+    s"${namespace}_${appId}${channelId.map("_" + _).getOrElse("")}"
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
new file mode 100644
index 0000000..661e05e
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import scalikejdbc._
+
+/** JDBC implementation of [[BaseStorageClient]] */
+class StorageClient(val config: StorageClientConfig)
+  extends BaseStorageClient with Logging {
+  override val prefix = "JDBC"
+
+  if (!config.properties.contains("URL")) {
+    throw new StorageClientException("The URL variable is not set!", null)
+  }
+  if (!config.properties.contains("USERNAME")) {
+    throw new StorageClientException("The USERNAME variable is not set!", null)
+  }
+  if (!config.properties.contains("PASSWORD")) {
+    throw new StorageClientException("The PASSWORD variable is not set!", null)
+  }
+
+  // set max size of connection pool
+  val maxSize: Int = config.properties.getOrElse("CONNECTIONS", "8").toInt
+  val settings = ConnectionPoolSettings(maxSize = maxSize)
+
+  ConnectionPool.singleton(
+    config.properties("URL"),
+    config.properties("USERNAME"),
+    config.properties("PASSWORD"),
+    settings)
+  /** JDBC connection URL. Connections are managed by ScalikeJDBC. */
+  val client = config.properties("URL")
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
new file mode 100644
index 0000000..e552e54
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** JDBC implementation of storage traits, supporting meta data, event data, and
+  * model data
+  *
+  * @group Implementation
+  */
+package object jdbc {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/resources/application.conf b/storage/jdbc/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/jdbc/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+  sources {
+    mongodb {
+      type = mongodb
+      hosts = [localhost]
+      ports = [27017]
+    }
+    elasticsearch {
+      type = elasticsearch
+      hosts = [localhost]
+      ports = [9300]
+    }
+  }
+  repositories {
+    # This section is dummy just to make storage happy.
+    # The actual testing will not bypass these repository settings completely.
+    # Please refer to StorageTestUtils.scala.
+    settings {
+      name = "test_predictionio"
+      source = mongodb
+    }
+
+    appdata {
+      name = "test_predictionio_appdata"
+      source = mongodb
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/.gitignore
----------------------------------------------------------------------
diff --git a/storage/localfs/.gitignore b/storage/localfs/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/localfs/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/build.sbt
----------------------------------------------------------------------
diff --git a/storage/localfs/build.sbt b/storage/localfs/build.sbt
new file mode 100644
index 0000000..2cf9977
--- /dev/null
+++ b/storage/localfs/build.sbt
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-localfs"
+
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+  "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+  "org.scalatest"           %% "scalatest"      % "2.1.7" % "test",
+  "org.specs2"              %% "specs2"         % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+  case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+  case PathList("META-INF", "NOTICE.txt")  => MergeStrategy.concat
+  case x =>
+    val oldStrategy = (assemblyMergeStrategy in assembly).value
+    oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-localfs-assembly-" + version.value + ".jar")
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
----------------------------------------------------------------------
diff --git a/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala b/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
new file mode 100644
index 0000000..f528af9
--- /dev/null
+++ b/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.localfs
+
+import java.io.File
+import java.io.FileNotFoundException
+import java.io.FileOutputStream
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+
+import scala.io.Source
+
+class LocalFSModels(f: File, config: StorageClientConfig, prefix: String)
+  extends Models with Logging {
+
+  def insert(i: Model): Unit = {
+    try {
+      val fos = new FileOutputStream(new File(f, s"${prefix}${i.id}"))
+      fos.write(i.models)
+      fos.close
+    } catch {
+      case e: FileNotFoundException => error(e.getMessage)
+    }
+  }
+
+  def get(id: String): Option[Model] = {
+    try {
+      Some(Model(
+        id = id,
+        models = Source.fromFile(new File(f, s"${prefix}${id}"))(
+          scala.io.Codec.ISO8859).map(_.toByte).toArray))
+    } catch {
+      case e: Throwable =>
+        error(e.getMessage)
+        None
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val m = new File(f, s"${prefix}${id}")
+    if (!m.delete) error(s"Unable to delete ${m.getCanonicalPath}!")
+  }
+}