You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/08 07:45:57 UTC
[2/7] incubator-predictionio git commit: [PIO-49] Add support for
Elasticsearch 5
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
new file mode 100644
index 0000000..de74d46
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase.upgrade
+
+import org.apache.predictionio.annotation.Experimental
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.hbase.HBLEvents
+import org.apache.predictionio.data.storage.hbase.HBEventsUtil
+
+import scala.collection.JavaConversions._
+
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import org.apache.predictionio.data.storage.LEvents
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import java.lang.Thread
+
+object CheckDistribution {
+ def entityType(eventClient: LEvents, appId: Int)
+ : Map[(String, Option[String]), Int] = {
+ eventClient
+ .find(appId = appId)
+ .foldLeft(Map[(String, Option[String]), Int]().withDefaultValue(0)) {
+ case (m, e) => {
+ val k = (e.entityType, e.targetEntityType)
+ m.updated(k, m(k) + 1)
+ }
+ }
+ }
+
+ def runMain(appId: Int) {
+ val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+ entityType(eventClient, appId)
+ .toSeq
+ .sortBy(-_._2)
+ .foreach { println }
+
+ }
+
+ def main(args: Array[String]) {
+ runMain(args(0).toInt)
+ }
+
+}
+
+/** :: Experimental :: */
+@Experimental
+object Upgrade_0_8_3 {
+ val NameMap = Map(
+ "pio_user" -> "user",
+ "pio_item" -> "item")
+ val RevNameMap = NameMap.toSeq.map(_.swap).toMap
+
+ val logger = Logger[this.type]
+
+ def main(args: Array[String]) {
+ val fromAppId = args(0).toInt
+ val toAppId = args(1).toInt
+
+ runMain(fromAppId, toAppId)
+ }
+
+ def runMain(fromAppId: Int, toAppId: Int): Unit = {
+ upgrade(fromAppId, toAppId)
+ }
+
+
+ val obsEntityTypes = Set("pio_user", "pio_item")
+ val obsProperties = Set(
+ "pio_itypes", "pio_starttime", "pio_endtime",
+ "pio_inactive", "pio_price", "pio_rating")
+
+ def hasPIOPrefix(eventClient: LEvents, appId: Int): Boolean = {
+ eventClient.find(appId = appId).filter( e =>
+ (obsEntityTypes.contains(e.entityType) ||
+ e.targetEntityType.map(obsEntityTypes.contains(_)).getOrElse(false) ||
+ (!e.properties.keySet.forall(!obsProperties.contains(_)))
+ )
+ ).hasNext
+ }
+
+ def isEmpty(eventClient: LEvents, appId: Int): Boolean =
+ !eventClient.find(appId = appId).hasNext
+
+
+ def upgradeCopy(eventClient: LEvents, fromAppId: Int, toAppId: Int) {
+ val fromDist = CheckDistribution.entityType(eventClient, fromAppId)
+
+ logger.info("FromAppId Distribution")
+ fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+ val events = eventClient
+ .find(appId = fromAppId)
+ .zipWithIndex
+ .foreach { case (fromEvent, index) => {
+ if (index % 50000 == 0) {
+ // logger.info(s"Progress: $fromEvent $index")
+ logger.info(s"Progress: $index")
+ }
+
+
+ val fromEntityType = fromEvent.entityType
+ val toEntityType = NameMap.getOrElse(fromEntityType, fromEntityType)
+
+ val fromTargetEntityType = fromEvent.targetEntityType
+ val toTargetEntityType = fromTargetEntityType
+ .map { et => NameMap.getOrElse(et, et) }
+
+ val toProperties = DataMap(fromEvent.properties.fields.map {
+ case (k, v) =>
+ val newK = if (obsProperties.contains(k)) {
+ val nK = k.stripPrefix("pio_")
+ logger.info(s"property ${k} will be renamed to ${nK}")
+ nK
+ } else k
+ (newK, v)
+ })
+
+ val toEvent = fromEvent.copy(
+ entityType = toEntityType,
+ targetEntityType = toTargetEntityType,
+ properties = toProperties)
+
+ eventClient.insert(toEvent, toAppId)
+ }}
+
+
+ val toDist = CheckDistribution.entityType(eventClient, toAppId)
+
+ logger.info("Recap fromAppId Distribution")
+ fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+ logger.info("ToAppId Distribution")
+ toDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) }
+
+ val fromGood = fromDist
+ .toSeq
+ .forall { case (k, c) => {
+ val (et, tet) = k
+ val net = NameMap.getOrElse(et, et)
+ val ntet = tet.map(tet => NameMap.getOrElse(tet, tet))
+ val nk = (net, ntet)
+ val nc = toDist.getOrElse(nk, -1)
+ val checkMatch = (c == nc)
+ if (!checkMatch) {
+ logger.info(s"${k} doesn't match: old has ${c}. new has ${nc}.")
+ }
+ checkMatch
+ }}
+
+ val toGood = toDist
+ .toSeq
+ .forall { case (k, c) => {
+ val (et, tet) = k
+ val oet = RevNameMap.getOrElse(et, et)
+ val otet = tet.map(tet => RevNameMap.getOrElse(tet, tet))
+ val ok = (oet, otet)
+ val oc = fromDist.getOrElse(ok, -1)
+ val checkMatch = (c == oc)
+ if (!checkMatch) {
+ logger.info(s"${k} doesn't match: new has ${c}. old has ${oc}.")
+ }
+ checkMatch
+ }}
+
+ if (!fromGood || !toGood) {
+ logger.error("Doesn't match!! There is an import error.")
+ } else {
+ logger.info("Count matches. Looks like we are good to go.")
+ }
+ }
+
+ /* For upgrade from 0.8.2 to 0.8.3 only */
+ def upgrade(fromAppId: Int, toAppId: Int) {
+
+ val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents]
+
+ require(fromAppId != toAppId,
+ s"FromAppId: $fromAppId must be different from toAppId: $toAppId")
+
+ if (hasPIOPrefix(eventClient, fromAppId)) {
+ require(
+ isEmpty(eventClient, toAppId),
+ s"Target appId: $toAppId is not empty. Please run " +
+ "`pio app data-delete <app_name>` to clean the data before upgrading")
+
+ logger.info(s"$fromAppId isEmpty: " + isEmpty(eventClient, fromAppId))
+
+ upgradeCopy(eventClient, fromAppId, toAppId)
+
+ } else {
+ logger.info(s"From appId: ${fromAppId} doesn't contain"
+ + s" obsolete entityTypes ${obsEntityTypes} or"
+ + s" obsolete properties ${obsProperties}."
+ + " No need data migration."
+ + s" You can continue to use appId ${fromAppId}.")
+ }
+
+ logger.info("Done.")
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
new file mode 100644
index 0000000..b453820
--- /dev/null
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.hbase.HBPEvents
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+
+// each JValue data associated with the time it is set
+private[predictionio] case class PropTime(val d: JValue, val t: Long) extends Serializable
+
+private[predictionio] case class SetProp (
+ val fields: Map[String, PropTime],
+ // last set time. Note: fields could be empty with valid set time
+ val t: Long) extends Serializable {
+
+ def ++ (that: SetProp): SetProp = {
+ val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+ val common: Map[String, PropTime] = commonKeys.map { k =>
+ val thisData = this.fields(k)
+ val thatData = that.fields(k)
+ // only keep the value with latest time
+ val v = if (thisData.t > thatData.t) thisData else thatData
+ (k, v)
+ }.toMap
+
+ val combinedFields = common ++
+ (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+ // keep the latest set time
+ val combinedT = if (this.t > that.t) this.t else that.t
+
+ SetProp(
+ fields = combinedFields,
+ t = combinedT
+ )
+ }
+}
+
+private[predictionio] case class UnsetProp (fields: Map[String, Long]) extends Serializable {
+ def ++ (that: UnsetProp): UnsetProp = {
+ val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+ val common: Map[String, Long] = commonKeys.map { k =>
+ val thisData = this.fields(k)
+ val thatData = that.fields(k)
+ // only keep the value with latest time
+ val v = if (thisData > thatData) thisData else thatData
+ (k, v)
+ }.toMap
+
+ val combinedFields = common ++
+ (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+ UnsetProp(
+ fields = combinedFields
+ )
+ }
+}
+
+private[predictionio] case class DeleteEntity (t: Long) extends Serializable {
+ def ++ (that: DeleteEntity): DeleteEntity = {
+ if (this.t > that.t) this else that
+ }
+}
+
+private[predictionio] case class EventOp (
+ val setProp: Option[SetProp] = None,
+ val unsetProp: Option[UnsetProp] = None,
+ val deleteEntity: Option[DeleteEntity] = None
+) extends Serializable {
+
+ def ++ (that: EventOp): EventOp = {
+ EventOp(
+ setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
+ unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
+ deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _)
+ )
+ }
+
+ def toDataMap(): Option[DataMap] = {
+ setProp.flatMap { set =>
+
+ val unsetKeys: Set[String] = unsetProp.map( unset =>
+ unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
+ ).getOrElse(Set())
+
+ val combinedFields = deleteEntity.map { delete =>
+ if (delete.t >= set.t) {
+ None
+ } else {
+ val deleteKeys: Set[String] = set.fields
+ .filter { case (k, PropTime(kv, t)) =>
+ (delete.t >= t)
+ }.keySet
+ Some(set.fields -- unsetKeys -- deleteKeys)
+ }
+ }.getOrElse{
+ Some(set.fields -- unsetKeys)
+ }
+
+ // Note: mapValues() doesn't return concrete Map and causes
+ // NotSerializableException issue. Use map(identity) to work around this.
+ // see https://issues.scala-lang.org/browse/SI-7005
+ combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity)))
+ }
+ }
+
+}
+
+private[predictionio] object EventOp {
+ def apply(e: Event): EventOp = {
+ val t = e.eventTime.getMillis
+ e.event match {
+ case "$set" => {
+ val fields = e.properties.fields.mapValues(jv =>
+ PropTime(jv, t)
+ ).map(identity)
+
+ EventOp(
+ setProp = Some(SetProp(fields = fields, t = t))
+ )
+ }
+ case "$unset" => {
+ val fields = e.properties.fields.mapValues(jv => t).map(identity)
+ EventOp(
+ unsetProp = Some(UnsetProp(fields = fields))
+ )
+ }
+ case "$delete" => {
+ EventOp(
+ deleteEntity = Some(DeleteEntity(t))
+ )
+ }
+ case _ => {
+ EventOp()
+ }
+ }
+ }
+}
+
+@deprecated("Use PEvents or PEventStore instead.", "0.9.2")
+class PBatchView(
+ val appId: Int,
+ val startTime: Option[DateTime],
+ val untilTime: Option[DateTime],
+ val sc: SparkContext) {
+
+ // NOTE: parallel Events DB interface
+ @transient lazy val eventsDb = Storage.getPEvents()
+
+ @transient lazy val _events: RDD[Event] =
+ eventsDb.getByAppIdAndTimeAndEntity(
+ appId = appId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = None,
+ entityId = None)(sc)
+
+ // TODO: change to use EventSeq?
+ @transient lazy val events: RDD[Event] = _events
+
+ def aggregateProperties(
+ entityType: String,
+ startTimeOpt: Option[DateTime] = None,
+ untilTimeOpt: Option[DateTime] = None
+ ): RDD[(String, DataMap)] = {
+
+ _events
+ .filter( e => ((e.entityType == entityType) &&
+ (EventValidation.isSpecialEvents(e.event))) )
+ .map( e => (e.entityId, EventOp(e) ))
+ .aggregateByKey[EventOp](EventOp())(
+ // within same partition
+ seqOp = { case (u, v) => u ++ v },
+ // across partition
+ combOp = { case (accu, u) => accu ++ u }
+ )
+ .mapValues(_.toDataMap)
+ .filter{ case (k, v) => v.isDefined }
+ .map{ case (k, v) => (k, v.get) }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/resources/application.conf b/storage/hbase/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/hbase/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+ sources {
+ mongodb {
+ type = mongodb
+ hosts = [localhost]
+ ports = [27017]
+ }
+ elasticsearch {
+ type = elasticsearch
+ hosts = [localhost]
+ ports = [9300]
+ }
+ }
+ repositories {
+ # This section is dummy just to make storage happy.
+ # The actual testing will not bypass these repository settings completely.
+ # Please refer to StorageTestUtils.scala.
+ settings {
+ name = "test_predictionio"
+ source = mongodb
+ }
+
+ appdata {
+ name = "test_predictionio_appdata"
+ source = mongodb
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/.gitignore
----------------------------------------------------------------------
diff --git a/storage/hdfs/.gitignore b/storage/hdfs/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/hdfs/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/build.sbt
----------------------------------------------------------------------
diff --git a/storage/hdfs/build.sbt b/storage/hdfs/build.sbt
new file mode 100644
index 0000000..9f064c6
--- /dev/null
+++ b/storage/hdfs/build.sbt
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-hdfs"
+
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+ "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+ "org.scalatest" %% "scalatest" % "2.1.7" % "test",
+ "org.specs2" %% "specs2" % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+ case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+ case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat
+ case x =>
+ val oldStrategy = (assemblyMergeStrategy in assembly).value
+ oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-hdfs-assembly-" + version.value + ".jar")
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
new file mode 100644
index 0000000..08dfb01
--- /dev/null
+++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hdfs
+
+import java.io.IOException
+
+import com.google.common.io.ByteStreams
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+class HDFSModels(fs: FileSystem, config: StorageClientConfig, prefix: String)
+ extends Models with Logging {
+
+ def insert(i: Model): Unit = {
+ try {
+ val fsdos = fs.create(new Path(s"$prefix${i.id}"))
+ fsdos.write(i.models)
+ fsdos.close
+ } catch {
+ case e: IOException => error(e.getMessage)
+ }
+ }
+
+ def get(id: String): Option[Model] = {
+ try {
+ val p = new Path(s"$prefix$id")
+ Some(Model(
+ id = id,
+ models = ByteStreams.toByteArray(fs.open(p))))
+ } catch {
+ case e: Throwable =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def delete(id: String): Unit = {
+ val p = new Path(s"$prefix$id")
+ if (!fs.delete(p, false)) {
+ error(s"Unable to delete ${fs.makeQualified(p).toString}!")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
new file mode 100644
index 0000000..bc57f2a
--- /dev/null
+++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hdfs
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+ with Logging {
+ override val prefix = "HDFS"
+ val conf = new Configuration
+ val fs = FileSystem.get(conf)
+ fs.setWorkingDirectory(
+ new Path(config.properties.getOrElse("PATH", config.properties("HOSTS"))))
+ val client = fs
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
new file mode 100644
index 0000000..a927d78
--- /dev/null
+++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** HDFS implementation of storage traits, supporting model data only
+ *
+ * @group Implementation
+ */
+package object hdfs {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/hdfs/src/test/resources/application.conf b/storage/hdfs/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/hdfs/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+ sources {
+ mongodb {
+ type = mongodb
+ hosts = [localhost]
+ ports = [27017]
+ }
+ elasticsearch {
+ type = elasticsearch
+ hosts = [localhost]
+ ports = [9300]
+ }
+ }
+ repositories {
+ # This section is dummy just to make storage happy.
+ # The actual testing will not bypass these repository settings completely.
+ # Please refer to StorageTestUtils.scala.
+ settings {
+ name = "test_predictionio"
+ source = mongodb
+ }
+
+ appdata {
+ name = "test_predictionio_appdata"
+ source = mongodb
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/.gitignore
----------------------------------------------------------------------
diff --git a/storage/jdbc/.gitignore b/storage/jdbc/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/jdbc/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/build.sbt
----------------------------------------------------------------------
diff --git a/storage/jdbc/build.sbt b/storage/jdbc/build.sbt
new file mode 100644
index 0000000..63d420b
--- /dev/null
+++ b/storage/jdbc/build.sbt
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-jdbc"
+
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+ "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+ "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
+ "org.postgresql" % "postgresql" % "9.4-1204-jdbc41",
+ "org.scalikejdbc" %% "scalikejdbc" % "2.3.5",
+ "org.scalatest" %% "scalatest" % "2.1.7" % "test",
+ "org.specs2" %% "specs2" % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+ case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+ case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat
+ case x =>
+ val oldStrategy = (assemblyMergeStrategy in assembly).value
+ oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-jdbc-assembly-" + version.value + ".jar")
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
new file mode 100644
index 0000000..437f8ae
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+import scala.util.Random
+
+/** JDBC implementation of [[AccessKeys]] */
+class JDBCAccessKeys(client: String, config: StorageClientConfig, prefix: String)
+ extends AccessKeys with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "accesskeys")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ accesskey varchar(64) not null primary key,
+ appid integer not null,
+ events text)""".execute().apply()
+ }
+
+ def insert(accessKey: AccessKey): Option[String] = DB localTx { implicit s =>
+ val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+ val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(","))
+ sql"""
+ insert into $tableName values(
+ $key,
+ ${accessKey.appid},
+ $events)""".update().apply()
+ Some(key)
+ }
+
+ def get(key: String): Option[AccessKey] = DB readOnly { implicit session =>
+ sql"SELECT accesskey, appid, events FROM $tableName WHERE accesskey = $key".
+ map(resultToAccessKey).single().apply()
+ }
+
+ def getAll(): Seq[AccessKey] = DB readOnly { implicit session =>
+ sql"SELECT accesskey, appid, events FROM $tableName".map(resultToAccessKey).list().apply()
+ }
+
+ def getByAppid(appid: Int): Seq[AccessKey] = DB readOnly { implicit session =>
+ sql"SELECT accesskey, appid, events FROM $tableName WHERE appid = $appid".
+ map(resultToAccessKey).list().apply()
+ }
+
+ def update(accessKey: AccessKey): Unit = DB localTx { implicit session =>
+ val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(","))
+ sql"""
+ UPDATE $tableName SET
+ appid = ${accessKey.appid},
+ events = $events
+ WHERE accesskey = ${accessKey.key}""".update().apply()
+ }
+
+ def delete(key: String): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE accesskey = $key".update().apply()
+ }
+
+ /** Convert JDBC results to [[AccessKey]] */
+ def resultToAccessKey(rs: WrappedResultSet): AccessKey = {
+ AccessKey(
+ key = rs.string("accesskey"),
+ appid = rs.int("appid"),
+ events = rs.stringOpt("events").map(_.split(",").toSeq).getOrElse(Nil))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
new file mode 100644
index 0000000..17e6410
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Apps]] */
+class JDBCApps(client: String, config: StorageClientConfig, prefix: String)
+ extends Apps with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "apps")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id serial not null primary key,
+ name text not null,
+ description text)""".execute.apply()
+ }
+
+ def insert(app: App): Option[Int] = DB localTx { implicit session =>
+ val q = if (app.id == 0) {
+ sql"""
+ insert into $tableName (name, description) values(${app.name}, ${app.description})
+ """
+ } else {
+ sql"""
+ insert into $tableName values(${app.id}, ${app.name}, ${app.description})
+ """
+ }
+ Some(q.updateAndReturnGeneratedKey().apply().toInt)
+ }
+
+ def get(id: Int): Option[App] = DB readOnly { implicit session =>
+ sql"SELECT id, name, description FROM $tableName WHERE id = ${id}".map(rs =>
+ App(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ description = rs.stringOpt("description"))
+ ).single().apply()
+ }
+
+ def getByName(name: String): Option[App] = DB readOnly { implicit session =>
+ sql"SELECT id, name, description FROM $tableName WHERE name = ${name}".map(rs =>
+ App(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ description = rs.stringOpt("description"))
+ ).single().apply()
+ }
+
+ def getAll(): Seq[App] = DB readOnly { implicit session =>
+ sql"SELECT id, name, description FROM $tableName".map(rs =>
+ App(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ description = rs.stringOpt("description"))
+ ).list().apply()
+ }
+
+ def update(app: App): Unit = DB localTx { implicit session =>
+ sql"""
+ update $tableName set name = ${app.name}, description = ${app.description}
+ where id = ${app.id}""".update().apply()
+ }
+
+ def delete(id: Int): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
new file mode 100644
index 0000000..c9aaca5
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Channels]] */
+class JDBCChannels(client: String, config: StorageClientConfig, prefix: String)
+ extends Channels with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "channels")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id serial not null primary key,
+ name text not null,
+ appid integer not null)""".execute().apply()
+ }
+
+ def insert(channel: Channel): Option[Int] = DB localTx { implicit session =>
+ val q = if (channel.id == 0) {
+ sql"INSERT INTO $tableName (name, appid) VALUES(${channel.name}, ${channel.appid})"
+ } else {
+ sql"INSERT INTO $tableName VALUES(${channel.id}, ${channel.name}, ${channel.appid})"
+ }
+ Some(q.updateAndReturnGeneratedKey().apply().toInt)
+ }
+
+ def get(id: Int): Option[Channel] = DB localTx { implicit session =>
+ sql"SELECT id, name, appid FROM $tableName WHERE id = $id".
+ map(resultToChannel).single().apply()
+ }
+
+ def getByAppid(appid: Int): Seq[Channel] = DB localTx { implicit session =>
+ sql"SELECT id, name, appid FROM $tableName WHERE appid = $appid".
+ map(resultToChannel).list().apply()
+ }
+
+ def delete(id: Int): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+
+ def resultToChannel(rs: WrappedResultSet): Channel = {
+ Channel(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ appid = rs.int("appid"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
new file mode 100644
index 0000000..13c374d
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[EngineInstances]] */
+class JDBCEngineInstances(client: String, config: StorageClientConfig, prefix: String)
+ extends EngineInstances with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "engineinstances")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id varchar(100) not null primary key,
+ status text not null,
+ startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ engineId text not null,
+ engineVersion text not null,
+ engineVariant text not null,
+ engineFactory text not null,
+ batch text not null,
+ env text not null,
+ sparkConf text not null,
+ datasourceParams text not null,
+ preparatorParams text not null,
+ algorithmsParams text not null,
+ servingParams text not null)""".execute().apply()
+ }
+
+ def insert(i: EngineInstance): String = DB localTx { implicit session =>
+ val id = java.util.UUID.randomUUID().toString
+ sql"""
+ INSERT INTO $tableName VALUES(
+ $id,
+ ${i.status},
+ ${i.startTime},
+ ${i.endTime},
+ ${i.engineId},
+ ${i.engineVersion},
+ ${i.engineVariant},
+ ${i.engineFactory},
+ ${i.batch},
+ ${JDBCUtils.mapToString(i.env)},
+ ${JDBCUtils.mapToString(i.sparkConf)},
+ ${i.dataSourceParams},
+ ${i.preparatorParams},
+ ${i.algorithmsParams},
+ ${i.servingParams})""".update().apply()
+ id
+ }
+
+ def get(id: String): Option[EngineInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ engineId,
+ engineVersion,
+ engineVariant,
+ engineFactory,
+ batch,
+ env,
+ sparkConf,
+ datasourceParams,
+ preparatorParams,
+ algorithmsParams,
+ servingParams
+ FROM $tableName WHERE id = $id""".map(resultToEngineInstance).
+ single().apply()
+ }
+
+ def getAll(): Seq[EngineInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ engineId,
+ engineVersion,
+ engineVariant,
+ engineFactory,
+ batch,
+ env,
+ sparkConf,
+ datasourceParams,
+ preparatorParams,
+ algorithmsParams,
+ servingParams
+ FROM $tableName""".map(resultToEngineInstance).list().apply()
+ }
+
+ def getLatestCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Option[EngineInstance] =
+ getCompleted(engineId, engineVersion, engineVariant).headOption
+
+ def getCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Seq[EngineInstance] = DB localTx { implicit s =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ engineId,
+ engineVersion,
+ engineVariant,
+ engineFactory,
+ batch,
+ env,
+ sparkConf,
+ datasourceParams,
+ preparatorParams,
+ algorithmsParams,
+ servingParams
+ FROM $tableName
+ WHERE
+ status = 'COMPLETED' AND
+ engineId = $engineId AND
+ engineVersion = $engineVersion AND
+ engineVariant = $engineVariant
+ ORDER BY startTime DESC""".
+ map(resultToEngineInstance).list().apply()
+ }
+
+ def update(i: EngineInstance): Unit = DB localTx { implicit session =>
+ sql"""
+ update $tableName set
+ status = ${i.status},
+ startTime = ${i.startTime},
+ endTime = ${i.endTime},
+ engineId = ${i.engineId},
+ engineVersion = ${i.engineVersion},
+ engineVariant = ${i.engineVariant},
+ engineFactory = ${i.engineFactory},
+ batch = ${i.batch},
+ env = ${JDBCUtils.mapToString(i.env)},
+ sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+ datasourceParams = ${i.dataSourceParams},
+ preparatorParams = ${i.preparatorParams},
+ algorithmsParams = ${i.algorithmsParams},
+ servingParams = ${i.servingParams}
+ where id = ${i.id}""".update().apply()
+ }
+
+ def delete(id: String): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+
+ /** Convert JDBC results to [[EngineInstance]] */
+ def resultToEngineInstance(rs: WrappedResultSet): EngineInstance = {
+ EngineInstance(
+ id = rs.string("id"),
+ status = rs.string("status"),
+ startTime = rs.jodaDateTime("startTime"),
+ endTime = rs.jodaDateTime("endTime"),
+ engineId = rs.string("engineId"),
+ engineVersion = rs.string("engineVersion"),
+ engineVariant = rs.string("engineVariant"),
+ engineFactory = rs.string("engineFactory"),
+ batch = rs.string("batch"),
+ env = JDBCUtils.stringToMap(rs.string("env")),
+ sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+ dataSourceParams = rs.string("datasourceParams"),
+ preparatorParams = rs.string("preparatorParams"),
+ algorithmsParams = rs.string("algorithmsParams"),
+ servingParams = rs.string("servingParams"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
new file mode 100644
index 0000000..90eb5f3
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementations of [[EvaluationInstances]] */
+class JDBCEvaluationInstances(client: String, config: StorageClientConfig, prefix: String)
+ extends EvaluationInstances with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "evaluationinstances")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id varchar(100) not null primary key,
+ status text not null,
+ startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ evaluationClass text not null,
+ engineParamsGeneratorClass text not null,
+ batch text not null,
+ env text not null,
+ sparkConf text not null,
+ evaluatorResults text not null,
+ evaluatorResultsHTML text not null,
+ evaluatorResultsJSON text)""".execute().apply()
+ }
+
+ def insert(i: EvaluationInstance): String = DB localTx { implicit session =>
+ val id = java.util.UUID.randomUUID().toString
+ sql"""
+ INSERT INTO $tableName VALUES(
+ $id,
+ ${i.status},
+ ${i.startTime},
+ ${i.endTime},
+ ${i.evaluationClass},
+ ${i.engineParamsGeneratorClass},
+ ${i.batch},
+ ${JDBCUtils.mapToString(i.env)},
+ ${JDBCUtils.mapToString(i.sparkConf)},
+ ${i.evaluatorResults},
+ ${i.evaluatorResultsHTML},
+ ${i.evaluatorResultsJSON})""".update().apply()
+ id
+ }
+
+ def get(id: String): Option[EvaluationInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ evaluationClass,
+ engineParamsGeneratorClass,
+ batch,
+ env,
+ sparkConf,
+ evaluatorResults,
+ evaluatorResultsHTML,
+ evaluatorResultsJSON
+ FROM $tableName WHERE id = $id
+ """.map(resultToEvaluationInstance).single().apply()
+ }
+
+ def getAll(): Seq[EvaluationInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ evaluationClass,
+ engineParamsGeneratorClass,
+ batch,
+ env,
+ sparkConf,
+ evaluatorResults,
+ evaluatorResultsHTML,
+ evaluatorResultsJSON
+ FROM $tableName
+ """.map(resultToEvaluationInstance).list().apply()
+ }
+
+ def getCompleted(): Seq[EvaluationInstance] = DB localTx { implicit s =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ evaluationClass,
+ engineParamsGeneratorClass,
+ batch,
+ env,
+ sparkConf,
+ evaluatorResults,
+ evaluatorResultsHTML,
+ evaluatorResultsJSON
+ FROM $tableName
+ WHERE
+ status = 'EVALCOMPLETED'
+ ORDER BY starttime DESC
+ """.map(resultToEvaluationInstance).list().apply()
+ }
+
+ def update(i: EvaluationInstance): Unit = DB localTx { implicit session =>
+ sql"""
+ update $tableName set
+ status = ${i.status},
+ startTime = ${i.startTime},
+ endTime = ${i.endTime},
+ evaluationClass = ${i.evaluationClass},
+ engineParamsGeneratorClass = ${i.engineParamsGeneratorClass},
+ batch = ${i.batch},
+ env = ${JDBCUtils.mapToString(i.env)},
+ sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+ evaluatorResults = ${i.evaluatorResults},
+ evaluatorResultsHTML = ${i.evaluatorResultsHTML},
+ evaluatorResultsJSON = ${i.evaluatorResultsJSON}
+ where id = ${i.id}""".update().apply()
+ }
+
+ def delete(id: String): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+
+ /** Convert JDBC results to [[EvaluationInstance]] */
+ def resultToEvaluationInstance(rs: WrappedResultSet): EvaluationInstance = {
+ EvaluationInstance(
+ id = rs.string("id"),
+ status = rs.string("status"),
+ startTime = rs.jodaDateTime("startTime"),
+ endTime = rs.jodaDateTime("endTime"),
+ evaluationClass = rs.string("evaluationClass"),
+ engineParamsGeneratorClass = rs.string("engineParamsGeneratorClass"),
+ batch = rs.string("batch"),
+ env = JDBCUtils.stringToMap(rs.string("env")),
+ sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+ evaluatorResults = rs.string("evaluatorResults"),
+ evaluatorResultsHTML = rs.string("evaluatorResultsHTML"),
+ evaluatorResultsJSON = rs.string("evaluatorResultsJSON"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
new file mode 100644
index 0000000..dddef67
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s.JObject
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import scalikejdbc._
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+/** JDBC implementation of [[LEvents]] */
+class JDBCLEvents(
+ client: String,
+ config: StorageClientConfig,
+ namespace: String) extends LEvents with Logging {
+ implicit private val formats = org.json4s.DefaultFormats
+
+ def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+
+ // To use index, it must be varchar less than 255 characters on a VARCHAR column
+ val useIndex = config.properties.contains("INDEX") &&
+ config.properties("INDEX").equalsIgnoreCase("enabled")
+
+ val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+ val entityIdIndexName = s"idx_${tableName}_ei"
+ val entityTypeIndexName = s"idx_${tableName}_et"
+ DB autoCommit { implicit session =>
+ if (useIndex) {
+ SQL(s"""
+ create table if not exists $tableName (
+ id varchar(32) not null primary key,
+ event varchar(255) not null,
+ entityType varchar(255) not null,
+ entityId varchar(255) not null,
+ targetEntityType text,
+ targetEntityId text,
+ properties text,
+ eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ eventTimeZone varchar(50) not null,
+ tags text,
+ prId text,
+ creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ creationTimeZone varchar(50) not null)""").execute().apply()
+
+ // create index
+ SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply()
+ SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply()
+ } else {
+ SQL(s"""
+ create table if not exists $tableName (
+ id varchar(32) not null primary key,
+ event text not null,
+ entityType text not null,
+ entityId text not null,
+ targetEntityType text,
+ targetEntityId text,
+ properties text,
+ eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ eventTimeZone varchar(50) not null,
+ tags text,
+ prId text,
+ creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ creationTimeZone varchar(50) not null)""").execute().apply()
+ }
+ true
+ }
+ }
+
+ def remove(appId: Int, channelId: Option[Int] = None): Boolean =
+ DB autoCommit { implicit session =>
+ SQL(s"""
+ drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+ """).execute().apply()
+ true
+ }
+
+ def close(): Unit = ConnectionPool.closeAll()
+
+ def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
+ implicit ec: ExecutionContext): Future[String] = Future {
+ DB localTx { implicit session =>
+ val id = event.eventId.getOrElse(JDBCUtils.generateId)
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ sql"""
+ insert into $tableName values(
+ $id,
+ ${event.event},
+ ${event.entityType},
+ ${event.entityId},
+ ${event.targetEntityType},
+ ${event.targetEntityId},
+ ${write(event.properties.toJObject)},
+ ${event.eventTime},
+ ${event.eventTime.getZone.getID},
+ ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None},
+ ${event.prId},
+ ${event.creationTime},
+ ${event.creationTime.getZone.getID}
+ )
+ """.update().apply()
+ id
+ }
+ }
+
+ def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
+ implicit ec: ExecutionContext): Future[Option[Event]] = Future {
+ DB readOnly { implicit session =>
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ sql"""
+ select
+ id,
+ event,
+ entityType,
+ entityId,
+ targetEntityType,
+ targetEntityId,
+ properties,
+ eventTime,
+ eventTimeZone,
+ tags,
+ prId,
+ creationTime,
+ creationTimeZone
+ from $tableName
+ where id = $eventId
+ """.map(resultToEvent).single().apply()
+ }
+ }
+
+ def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
+ implicit ec: ExecutionContext): Future[Boolean] = Future {
+ DB localTx { implicit session =>
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ sql"""
+ delete from $tableName where id = $eventId
+ """.update().apply()
+ true
+ }
+ }
+
+ def futureFind(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ limit: Option[Int] = None,
+ reversed: Option[Boolean] = None
+ )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future {
+ DB readOnly { implicit session =>
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ val whereClause = sqls.toAndConditionOpt(
+ startTime.map(x => sqls"eventTime >= $x"),
+ untilTime.map(x => sqls"eventTime < $x"),
+ entityType.map(x => sqls"entityType = $x"),
+ entityId.map(x => sqls"entityId = $x"),
+ eventNames.map(x =>
+ sqls.toOrConditionOpt(x.map(y =>
+ Some(sqls"event = $y")
+ ): _*)
+ ).getOrElse(None),
+ targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y")
+ .getOrElse(sqls"targetEntityType IS NULL")),
+ targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y")
+ .getOrElse(sqls"targetEntityId IS NULL"))
+ ).map(sqls.where(_)).getOrElse(sqls"")
+ val orderByClause = reversed.map(x =>
+ if (x) sqls"eventTime desc" else sqls"eventTime asc"
+ ).getOrElse(sqls"eventTime asc")
+ val limitClause = limit.map(x =>
+ if (x < 0) sqls"" else sqls.limit(x)
+ ).getOrElse(sqls"")
+ val q = sql"""
+ select
+ id,
+ event,
+ entityType,
+ entityId,
+ targetEntityType,
+ targetEntityId,
+ properties,
+ eventTime,
+ eventTimeZone,
+ tags,
+ prId,
+ creationTime,
+ creationTimeZone
+ from $tableName
+ $whereClause
+ order by $orderByClause
+ $limitClause
+ """
+ q.map(resultToEvent).list().apply().toIterator
+ }
+ }
+
+ private[predictionio] def resultToEvent(rs: WrappedResultSet): Event = {
+ Event(
+ eventId = rs.stringOpt("id"),
+ event = rs.string("event"),
+ entityType = rs.string("entityType"),
+ entityId = rs.string("entityId"),
+ targetEntityType = rs.stringOpt("targetEntityType"),
+ targetEntityId = rs.stringOpt("targetEntityId"),
+ properties = rs.stringOpt("properties").map(p =>
+ DataMap(read[JObject](p))).getOrElse(DataMap()),
+ eventTime = new DateTime(rs.jodaDateTime("eventTime"),
+ DateTimeZone.forID(rs.string("eventTimeZone"))),
+ tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil),
+ prId = rs.stringOpt("prId"),
+ creationTime = new DateTime(rs.jodaDateTime("creationTime"),
+ DateTimeZone.forID(rs.string("creationTimeZone")))
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
new file mode 100644
index 0000000..b48502a
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Models]] */
+class JDBCModels(client: String, config: StorageClientConfig, prefix: String)
+ extends Models with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "models")
+
+ /** Determines binary column type based on JDBC driver type */
+ val binaryColumnType = JDBCUtils.binaryColumnType(client)
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id varchar(100) not null primary key,
+ models $binaryColumnType not null)""".execute().apply()
+ }
+
+ def insert(i: Model): Unit = DB localTx { implicit session =>
+ sql"insert into $tableName values(${i.id}, ${i.models})".update().apply()
+ }
+
+ def get(id: String): Option[Model] = DB readOnly { implicit session =>
+ sql"select id, models from $tableName where id = $id".map { r =>
+ Model(id = r.string("id"), models = r.bytes("models"))
+ }.single().apply()
+ }
+
+ def delete(id: String): Unit = DB localTx { implicit session =>
+ sql"delete from $tableName where id = $id".execute().apply()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
new file mode 100644
index 0000000..2e6ee83
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import java.sql.{DriverManager, ResultSet}
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{JdbcRDD, RDD}
+import org.apache.spark.sql.{SQLContext, SaveMode}
+import org.json4s.JObject
+import org.json4s.native.Serialization
+import scalikejdbc._
+
+/** JDBC implementation of [[PEvents]] */
+class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String) extends PEvents {
+ @transient private implicit lazy val formats = org.json4s.DefaultFormats
+ def find(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = {
+ val lower = startTime.map(_.getMillis).getOrElse(0.toLong)
+ /** Change the default upper bound from +100 to +1 year because MySQL's
+ * FROM_UNIXTIME(t) will return NULL if we use +100 years.
+ */
+ val upper = untilTime.map(_.getMillis).getOrElse((DateTime.now + 1.years).getMillis)
+ val par = scala.math.min(
+ new Duration(upper - lower).getStandardDays,
+ config.properties.getOrElse("PARTITIONS", "4").toLong).toInt
+ val entityTypeClause = entityType.map(x => s"and entityType = '$x'").getOrElse("")
+ val entityIdClause = entityId.map(x => s"and entityId = '$x'").getOrElse("")
+ val eventNamesClause =
+ eventNames.map("and (" + _.map(y => s"event = '$y'").mkString(" or ") + ")").getOrElse("")
+ val targetEntityTypeClause = targetEntityType.map(
+ _.map(x => s"and targetEntityType = '$x'"
+ ).getOrElse("and targetEntityType is null")).getOrElse("")
+ val targetEntityIdClause = targetEntityId.map(
+ _.map(x => s"and targetEntityId = '$x'"
+ ).getOrElse("and targetEntityId is null")).getOrElse("")
+ val q = s"""
+ select
+ id,
+ event,
+ entityType,
+ entityId,
+ targetEntityType,
+ targetEntityId,
+ properties,
+ eventTime,
+ eventTimeZone,
+ tags,
+ prId,
+ creationTime,
+ creationTimeZone
+ from ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+ where
+ eventTime >= ${JDBCUtils.timestampFunction(client)}(?) and
+ eventTime < ${JDBCUtils.timestampFunction(client)}(?)
+ $entityTypeClause
+ $entityIdClause
+ $eventNamesClause
+ $targetEntityTypeClause
+ $targetEntityIdClause
+ """.replace("\n", " ")
+ new JdbcRDD(
+ sc,
+ () => {
+ DriverManager.getConnection(
+ client,
+ config.properties("USERNAME"),
+ config.properties("PASSWORD"))
+ },
+ q,
+ lower / 1000,
+ upper / 1000,
+ par,
+ (r: ResultSet) => {
+ Event(
+ eventId = Option(r.getString("id")),
+ event = r.getString("event"),
+ entityType = r.getString("entityType"),
+ entityId = r.getString("entityId"),
+ targetEntityType = Option(r.getString("targetEntityType")),
+ targetEntityId = Option(r.getString("targetEntityId")),
+ properties = Option(r.getString("properties")).map(x =>
+ DataMap(Serialization.read[JObject](x))).getOrElse(DataMap()),
+ eventTime = new DateTime(r.getTimestamp("eventTime").getTime,
+ DateTimeZone.forID(r.getString("eventTimeZone"))),
+ tags = Option(r.getString("tags")).map(x =>
+ x.split(",").toList).getOrElse(Nil),
+ prId = Option(r.getString("prId")),
+ creationTime = new DateTime(r.getTimestamp("creationTime").getTime,
+ DateTimeZone.forID(r.getString("creationTimeZone"))))
+ }).cache()
+ }
+
+ def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+ val sqlContext = new SQLContext(sc)
+
+ import sqlContext.implicits._
+
+ val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+
+ val eventTableColumns = Seq[String](
+ "id"
+ , "event"
+ , "entityType"
+ , "entityId"
+ , "targetEntityType"
+ , "targetEntityId"
+ , "properties"
+ , "eventTime"
+ , "eventTimeZone"
+ , "tags"
+ , "prId"
+ , "creationTime"
+ , "creationTimeZone")
+
+ val eventDF = events.map(x =>
+ Event(eventId = None, event = x.event, entityType = x.entityType,
+ entityId = x.entityId, targetEntityType = x.targetEntityType,
+ targetEntityId = x.targetEntityId, properties = x.properties,
+ eventTime = x.eventTime, tags = x.tags, prId= x.prId,
+ creationTime = x.eventTime)
+ )
+ .map { event =>
+ (event.eventId.getOrElse(JDBCUtils.generateId)
+ , event.event
+ , event.entityType
+ , event.entityId
+ , event.targetEntityType.orNull
+ , event.targetEntityId.orNull
+ , if (!event.properties.isEmpty) Serialization.write(event.properties.toJObject) else null
+ , new java.sql.Timestamp(event.eventTime.getMillis)
+ , event.eventTime.getZone.getID
+ , if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else null
+ , event.prId
+ , new java.sql.Timestamp(event.creationTime.getMillis)
+ , event.creationTime.getZone.getID)
+ }.toDF(eventTableColumns:_*)
+
+ // spark version 1.4.0 or higher
+ val prop = new java.util.Properties
+ prop.setProperty("user", config.properties("USERNAME"))
+ prop.setProperty("password", config.properties("PASSWORD"))
+ eventDF.write.mode(SaveMode.Append).jdbc(client, tableName, prop)
+ }
+
+ def delete(eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+
+ eventIds.foreachPartition{ iter =>
+
+ iter.foreach { eventId =>
+ DB localTx { implicit session =>
+ val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+ val table = SQLSyntax.createUnsafely(tableName)
+ sql"""
+ delete from $table where id = $eventId
+ """.update().apply()
+ true
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
new file mode 100644
index 0000000..3eb55ba
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import scalikejdbc._
+
+/** JDBC related utilities */
+object JDBCUtils {
+ /** Extract JDBC driver type from URL
+ *
+ * @param url JDBC URL
+ * @return The driver type, e.g. postgresql
+ */
+ def driverType(url: String): String = {
+ val capture = """jdbc:([^:]+):""".r
+ capture findFirstIn url match {
+ case Some(capture(driverType)) => driverType
+ case None => ""
+ }
+ }
+
+ /** Determines binary column type from JDBC URL
+ *
+ * @param url JDBC URL
+ * @return Binary column type as SQLSyntax, e.g. LONGBLOB
+ */
+ def binaryColumnType(url: String): SQLSyntax = {
+ driverType(url) match {
+ case "postgresql" => sqls"bytea"
+ case "mysql" => sqls"longblob"
+ case _ => sqls"longblob"
+ }
+ }
+
+ /** Determines UNIX timestamp conversion function from JDBC URL
+ *
+ * @param url JDBC URL
+ * @return Timestamp conversion function, e.g. TO_TIMESTAMP
+ */
+ def timestampFunction(url: String): String = {
+ driverType(url) match {
+ case "postgresql" => "to_timestamp"
+ case "mysql" => "from_unixtime"
+ case _ => "from_unixtime"
+ }
+ }
+
+ /** Converts Map of String to String to comma-separated list of key=value
+ *
+ * @param m Map of String to String
+ * @return Comma-separated list, e.g. FOO=BAR,X=Y,...
+ */
+ def mapToString(m: Map[String, String]): String = {
+ m.map(t => s"${t._1}=${t._2}").mkString(",")
+ }
+
+ /** Inverse of mapToString
+ *
+ * @param str Comma-separated list, e.g. FOO=BAR,X=Y,...
+ * @return Map of String to String, e.g. Map("FOO" -> "BAR", "X" -> "Y", ...)
+ */
+ def stringToMap(str: String): Map[String, String] = {
+ str.split(",").map { x =>
+ val y = x.split("=")
+ y(0) -> y(1)
+ }.toMap[String, String]
+ }
+
+ /** Generate 32-character random ID using UUID with - stripped */
+ def generateId: String = java.util.UUID.randomUUID().toString.replace("-", "")
+
+ /** Prefix a table name
+ *
+ * @param prefix Table prefix
+ * @param table Table name
+ * @return Prefixed table name
+ */
+ def prefixTableName(prefix: String, table: String): SQLSyntax =
+ sqls.createUnsafely(s"${prefix}_$table")
+
+ /** Derive event table name
+ *
+ * @param namespace Namespace of event tables
+ * @param appId App ID
+ * @param channelId Optional channel ID
+ * @return Full event table name
+ */
+ def eventTableName(namespace: String, appId: Int, channelId: Option[Int]): String =
+ s"${namespace}_${appId}${channelId.map("_" + _).getOrElse("")}"
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
new file mode 100644
index 0000000..661e05e
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import scalikejdbc._
+
+/** JDBC implementation of [[BaseStorageClient]] */
+class StorageClient(val config: StorageClientConfig)
+ extends BaseStorageClient with Logging {
+ override val prefix = "JDBC"
+
+ if (!config.properties.contains("URL")) {
+ throw new StorageClientException("The URL variable is not set!", null)
+ }
+ if (!config.properties.contains("USERNAME")) {
+ throw new StorageClientException("The USERNAME variable is not set!", null)
+ }
+ if (!config.properties.contains("PASSWORD")) {
+ throw new StorageClientException("The PASSWORD variable is not set!", null)
+ }
+
+ // set max size of connection pool
+ val maxSize: Int = config.properties.getOrElse("CONNECTIONS", "8").toInt
+ val settings = ConnectionPoolSettings(maxSize = maxSize)
+
+ ConnectionPool.singleton(
+ config.properties("URL"),
+ config.properties("USERNAME"),
+ config.properties("PASSWORD"),
+ settings)
+ /** JDBC connection URL. Connections are managed by ScalikeJDBC. */
+ val client = config.properties("URL")
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
new file mode 100644
index 0000000..e552e54
--- /dev/null
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** JDBC implementation of storage traits, supporting meta data, event data, and
+ * model data
+ *
+ * @group Implementation
+ */
+package object jdbc {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/resources/application.conf b/storage/jdbc/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/jdbc/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+ sources {
+ mongodb {
+ type = mongodb
+ hosts = [localhost]
+ ports = [27017]
+ }
+ elasticsearch {
+ type = elasticsearch
+ hosts = [localhost]
+ ports = [9300]
+ }
+ }
+ repositories {
+ # This section is dummy just to make storage happy.
+ # The actual testing will not bypass these repository settings completely.
+ # Please refer to StorageTestUtils.scala.
+ settings {
+ name = "test_predictionio"
+ source = mongodb
+ }
+
+ appdata {
+ name = "test_predictionio_appdata"
+ source = mongodb
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/.gitignore
----------------------------------------------------------------------
diff --git a/storage/localfs/.gitignore b/storage/localfs/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/localfs/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/build.sbt
----------------------------------------------------------------------
diff --git a/storage/localfs/build.sbt b/storage/localfs/build.sbt
new file mode 100644
index 0000000..2cf9977
--- /dev/null
+++ b/storage/localfs/build.sbt
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-localfs"
+
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+ "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+ "org.scalatest" %% "scalatest" % "2.1.7" % "test",
+ "org.specs2" %% "specs2" % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+ case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+ case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat
+ case x =>
+ val oldStrategy = (assemblyMergeStrategy in assembly).value
+ oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-localfs-assembly-" + version.value + ".jar")
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
----------------------------------------------------------------------
diff --git a/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala b/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
new file mode 100644
index 0000000..f528af9
--- /dev/null
+++ b/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.localfs
+
+import java.io.File
+import java.io.FileNotFoundException
+import java.io.FileOutputStream
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+
+import scala.io.Source
+
+class LocalFSModels(f: File, config: StorageClientConfig, prefix: String)
+ extends Models with Logging {
+
+ def insert(i: Model): Unit = {
+ try {
+ val fos = new FileOutputStream(new File(f, s"${prefix}${i.id}"))
+ fos.write(i.models)
+ fos.close
+ } catch {
+ case e: FileNotFoundException => error(e.getMessage)
+ }
+ }
+
+ def get(id: String): Option[Model] = {
+ try {
+ Some(Model(
+ id = id,
+ models = Source.fromFile(new File(f, s"${prefix}${id}"))(
+ scala.io.Codec.ISO8859).map(_.toByte).toArray))
+ } catch {
+ case e: Throwable =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def delete(id: String): Unit = {
+ val m = new File(f, s"${prefix}${id}")
+ if (!m.delete) error(s"Unable to delete ${m.getCanonicalPath}!")
+ }
+}