You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:40 UTC
[09/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
new file mode 100644
index 0000000..52c8b44
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala
@@ -0,0 +1,86 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Apps]] */
+class JDBCApps(client: String, config: StorageClientConfig, prefix: String)
+ extends Apps with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "apps")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id serial not null primary key,
+ name text not null,
+ description text)""".execute.apply()
+ }
+
+ def insert(app: App): Option[Int] = DB localTx { implicit session =>
+ val q = if (app.id == 0) {
+ sql"""
+ insert into $tableName (name, description) values(${app.name}, ${app.description})
+ """
+ } else {
+ sql"""
+ insert into $tableName values(${app.id}, ${app.name}, ${app.description})
+ """
+ }
+ Some(q.updateAndReturnGeneratedKey().apply().toInt)
+ }
+
+ def get(id: Int): Option[App] = DB readOnly { implicit session =>
+ sql"SELECT id, name, description FROM $tableName WHERE id = ${id}".map(rs =>
+ App(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ description = rs.stringOpt("description"))
+ ).single().apply()
+ }
+
+ def getByName(name: String): Option[App] = DB readOnly { implicit session =>
+ sql"SELECT id, name, description FROM $tableName WHERE name = ${name}".map(rs =>
+ App(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ description = rs.stringOpt("description"))
+ ).single().apply()
+ }
+
+ def getAll(): Seq[App] = DB readOnly { implicit session =>
+ sql"SELECT id, name, description FROM $tableName".map(rs =>
+ App(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ description = rs.stringOpt("description"))
+ ).list().apply()
+ }
+
+ def update(app: App): Unit = DB localTx { implicit session =>
+ sql"""
+ update $tableName set name = ${app.name}, description = ${app.description}
+ where id = ${app.id}""".update().apply()
+ }
+
+ def delete(id: Int): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
new file mode 100644
index 0000000..f94a64a
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala
@@ -0,0 +1,66 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Channels]] */
+class JDBCChannels(client: String, config: StorageClientConfig, prefix: String)
+ extends Channels with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "channels")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id serial not null primary key,
+ name text not null,
+ appid integer not null)""".execute().apply()
+ }
+
+ def insert(channel: Channel): Option[Int] = DB localTx { implicit session =>
+ val q = if (channel.id == 0) {
+ sql"INSERT INTO $tableName (name, appid) VALUES(${channel.name}, ${channel.appid})"
+ } else {
+ sql"INSERT INTO $tableName VALUES(${channel.id}, ${channel.name}, ${channel.appid})"
+ }
+ Some(q.updateAndReturnGeneratedKey().apply().toInt)
+ }
+
+ def get(id: Int): Option[Channel] = DB localTx { implicit session =>
+ sql"SELECT id, name, appid FROM $tableName WHERE id = $id".
+ map(resultToChannel).single().apply()
+ }
+
+ def getByAppid(appid: Int): Seq[Channel] = DB localTx { implicit session =>
+ sql"SELECT id, name, appid FROM $tableName WHERE appid = $appid".
+ map(resultToChannel).list().apply()
+ }
+
+ def delete(id: Int): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+
+ def resultToChannel(rs: WrappedResultSet): Channel = {
+ Channel(
+ id = rs.int("id"),
+ name = rs.string("name"),
+ appid = rs.int("appid"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
new file mode 100644
index 0000000..a4bd640
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala
@@ -0,0 +1,194 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[EngineInstances]] */
+class JDBCEngineInstances(client: String, config: StorageClientConfig, prefix: String)
+ extends EngineInstances with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "engineinstances")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id varchar(100) not null primary key,
+ status text not null,
+ startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ engineId text not null,
+ engineVersion text not null,
+ engineVariant text not null,
+ engineFactory text not null,
+ batch text not null,
+ env text not null,
+ sparkConf text not null,
+ datasourceParams text not null,
+ preparatorParams text not null,
+ algorithmsParams text not null,
+ servingParams text not null)""".execute().apply()
+ }
+
+ def insert(i: EngineInstance): String = DB localTx { implicit session =>
+ val id = java.util.UUID.randomUUID().toString
+ sql"""
+ INSERT INTO $tableName VALUES(
+ $id,
+ ${i.status},
+ ${i.startTime},
+ ${i.endTime},
+ ${i.engineId},
+ ${i.engineVersion},
+ ${i.engineVariant},
+ ${i.engineFactory},
+ ${i.batch},
+ ${JDBCUtils.mapToString(i.env)},
+ ${JDBCUtils.mapToString(i.sparkConf)},
+ ${i.dataSourceParams},
+ ${i.preparatorParams},
+ ${i.algorithmsParams},
+ ${i.servingParams})""".update().apply()
+ id
+ }
+
+ def get(id: String): Option[EngineInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ engineId,
+ engineVersion,
+ engineVariant,
+ engineFactory,
+ batch,
+ env,
+ sparkConf,
+ datasourceParams,
+ preparatorParams,
+ algorithmsParams,
+ servingParams
+ FROM $tableName WHERE id = $id""".map(resultToEngineInstance).
+ single().apply()
+ }
+
+ def getAll(): Seq[EngineInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ engineId,
+ engineVersion,
+ engineVariant,
+ engineFactory,
+ batch,
+ env,
+ sparkConf,
+ datasourceParams,
+ preparatorParams,
+ algorithmsParams,
+ servingParams
+ FROM $tableName""".map(resultToEngineInstance).list().apply()
+ }
+
+ def getLatestCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Option[EngineInstance] =
+ getCompleted(engineId, engineVersion, engineVariant).headOption
+
+ def getCompleted(
+ engineId: String,
+ engineVersion: String,
+ engineVariant: String): Seq[EngineInstance] = DB localTx { implicit s =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ engineId,
+ engineVersion,
+ engineVariant,
+ engineFactory,
+ batch,
+ env,
+ sparkConf,
+ datasourceParams,
+ preparatorParams,
+ algorithmsParams,
+ servingParams
+ FROM $tableName
+ WHERE
+ status = 'COMPLETED' AND
+ engineId = $engineId AND
+ engineVersion = $engineVersion AND
+ engineVariant = $engineVariant
+ ORDER BY startTime DESC""".
+ map(resultToEngineInstance).list().apply()
+ }
+
+ def update(i: EngineInstance): Unit = DB localTx { implicit session =>
+ sql"""
+ update $tableName set
+ status = ${i.status},
+ startTime = ${i.startTime},
+ endTime = ${i.endTime},
+ engineId = ${i.engineId},
+ engineVersion = ${i.engineVersion},
+ engineVariant = ${i.engineVariant},
+ engineFactory = ${i.engineFactory},
+ batch = ${i.batch},
+ env = ${JDBCUtils.mapToString(i.env)},
+ sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+ datasourceParams = ${i.dataSourceParams},
+ preparatorParams = ${i.preparatorParams},
+ algorithmsParams = ${i.algorithmsParams},
+ servingParams = ${i.servingParams}
+ where id = ${i.id}""".update().apply()
+ }
+
+ def delete(id: String): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+
+ /** Convert JDBC results to [[EngineInstance]] */
+ def resultToEngineInstance(rs: WrappedResultSet): EngineInstance = {
+ EngineInstance(
+ id = rs.string("id"),
+ status = rs.string("status"),
+ startTime = rs.jodaDateTime("startTime"),
+ endTime = rs.jodaDateTime("endTime"),
+ engineId = rs.string("engineId"),
+ engineVersion = rs.string("engineVersion"),
+ engineVariant = rs.string("engineVariant"),
+ engineFactory = rs.string("engineFactory"),
+ batch = rs.string("batch"),
+ env = JDBCUtils.stringToMap(rs.string("env")),
+ sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+ dataSourceParams = rs.string("datasourceParams"),
+ preparatorParams = rs.string("preparatorParams"),
+ algorithmsParams = rs.string("algorithmsParams"),
+ servingParams = rs.string("servingParams"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
new file mode 100644
index 0000000..b766689
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
@@ -0,0 +1,111 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifests
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[EngineManifests]] */
+class JDBCEngineManifests(client: String, config: StorageClientConfig, prefix: String)
+ extends EngineManifests with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "enginemanifests")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id varchar(100) not null primary key,
+ version text not null,
+ engineName text not null,
+ description text,
+ files text not null,
+ engineFactory text not null)""".execute().apply()
+ }
+
+ def insert(m: EngineManifest): Unit = DB localTx { implicit session =>
+ sql"""
+ INSERT INTO $tableName VALUES(
+ ${m.id},
+ ${m.version},
+ ${m.name},
+ ${m.description},
+ ${m.files.mkString(",")},
+ ${m.engineFactory})""".update().apply()
+ }
+
+ def get(id: String, version: String): Option[EngineManifest] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ version,
+ engineName,
+ description,
+ files,
+ engineFactory
+ FROM $tableName WHERE id = $id AND version = $version""".
+ map(resultToEngineManifest).single().apply()
+ }
+
+ def getAll(): Seq[EngineManifest] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ version,
+ engineName,
+ description,
+ files,
+ engineFactory
+ FROM $tableName""".map(resultToEngineManifest).list().apply()
+ }
+
+ def update(m: EngineManifest, upsert: Boolean = false): Unit = {
+ var r = 0
+ DB localTx { implicit session =>
+ r = sql"""
+ update $tableName set
+ engineName = ${m.name},
+ description = ${m.description},
+ files = ${m.files.mkString(",")},
+ engineFactory = ${m.engineFactory}
+ where id = ${m.id} and version = ${m.version}""".update().apply()
+ }
+ if (r == 0) {
+ if (upsert) {
+ insert(m)
+ } else {
+ error("Cannot find a record to update, and upsert is not enabled.")
+ }
+ }
+ }
+
+ def delete(id: String, version: String): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id AND version = $version".
+ update().apply()
+ }
+
+ /** Convert JDBC results to [[EngineManifest]] */
+ def resultToEngineManifest(rs: WrappedResultSet): EngineManifest = {
+ EngineManifest(
+ id = rs.string("id"),
+ version = rs.string("version"),
+ name = rs.string("engineName"),
+ description = rs.stringOpt("description"),
+ files = rs.string("files").split(","),
+ engineFactory = rs.string("engineFactory"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
new file mode 100644
index 0000000..1811271
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala
@@ -0,0 +1,162 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementations of [[EvaluationInstances]] */
+class JDBCEvaluationInstances(client: String, config: StorageClientConfig, prefix: String)
+ extends EvaluationInstances with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "evaluationinstances")
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id varchar(100) not null primary key,
+ status text not null,
+ startTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ endTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ evaluationClass text not null,
+ engineParamsGeneratorClass text not null,
+ batch text not null,
+ env text not null,
+ sparkConf text not null,
+ evaluatorResults text not null,
+ evaluatorResultsHTML text not null,
+ evaluatorResultsJSON text)""".execute().apply()
+ }
+
+ def insert(i: EvaluationInstance): String = DB localTx { implicit session =>
+ val id = java.util.UUID.randomUUID().toString
+ sql"""
+ INSERT INTO $tableName VALUES(
+ $id,
+ ${i.status},
+ ${i.startTime},
+ ${i.endTime},
+ ${i.evaluationClass},
+ ${i.engineParamsGeneratorClass},
+ ${i.batch},
+ ${JDBCUtils.mapToString(i.env)},
+ ${JDBCUtils.mapToString(i.sparkConf)},
+ ${i.evaluatorResults},
+ ${i.evaluatorResultsHTML},
+ ${i.evaluatorResultsJSON})""".update().apply()
+ id
+ }
+
+ def get(id: String): Option[EvaluationInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ evaluationClass,
+ engineParamsGeneratorClass,
+ batch,
+ env,
+ sparkConf,
+ evaluatorResults,
+ evaluatorResultsHTML,
+ evaluatorResultsJSON
+ FROM $tableName WHERE id = $id
+ """.map(resultToEvaluationInstance).single().apply()
+ }
+
+ def getAll(): Seq[EvaluationInstance] = DB localTx { implicit session =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ evaluationClass,
+ engineParamsGeneratorClass,
+ batch,
+ env,
+ sparkConf,
+ evaluatorResults,
+ evaluatorResultsHTML,
+ evaluatorResultsJSON
+ FROM $tableName
+ """.map(resultToEvaluationInstance).list().apply()
+ }
+
+ def getCompleted(): Seq[EvaluationInstance] = DB localTx { implicit s =>
+ sql"""
+ SELECT
+ id,
+ status,
+ startTime,
+ endTime,
+ evaluationClass,
+ engineParamsGeneratorClass,
+ batch,
+ env,
+ sparkConf,
+ evaluatorResults,
+ evaluatorResultsHTML,
+ evaluatorResultsJSON
+ FROM $tableName
+ WHERE
+ status = 'EVALCOMPLETED'
+ ORDER BY starttime DESC
+ """.map(resultToEvaluationInstance).list().apply()
+ }
+
+ def update(i: EvaluationInstance): Unit = DB localTx { implicit session =>
+ sql"""
+ update $tableName set
+ status = ${i.status},
+ startTime = ${i.startTime},
+ endTime = ${i.endTime},
+ evaluationClass = ${i.evaluationClass},
+ engineParamsGeneratorClass = ${i.engineParamsGeneratorClass},
+ batch = ${i.batch},
+ env = ${JDBCUtils.mapToString(i.env)},
+ sparkConf = ${JDBCUtils.mapToString(i.sparkConf)},
+ evaluatorResults = ${i.evaluatorResults},
+ evaluatorResultsHTML = ${i.evaluatorResultsHTML},
+ evaluatorResultsJSON = ${i.evaluatorResultsJSON}
+ where id = ${i.id}""".update().apply()
+ }
+
+ def delete(id: String): Unit = DB localTx { implicit session =>
+ sql"DELETE FROM $tableName WHERE id = $id".update().apply()
+ }
+
+ /** Convert JDBC results to [[EvaluationInstance]] */
+ def resultToEvaluationInstance(rs: WrappedResultSet): EvaluationInstance = {
+ EvaluationInstance(
+ id = rs.string("id"),
+ status = rs.string("status"),
+ startTime = rs.jodaDateTime("startTime"),
+ endTime = rs.jodaDateTime("endTime"),
+ evaluationClass = rs.string("evaluationClass"),
+ engineParamsGeneratorClass = rs.string("engineParamsGeneratorClass"),
+ batch = rs.string("batch"),
+ env = JDBCUtils.stringToMap(rs.string("env")),
+ sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")),
+ evaluatorResults = rs.string("evaluatorResults"),
+ evaluatorResultsHTML = rs.string("evaluatorResultsHTML"),
+ evaluatorResultsJSON = rs.string("evaluatorResultsJSON"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
new file mode 100644
index 0000000..945879c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala
@@ -0,0 +1,241 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s.JObject
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import scalikejdbc._
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+/** JDBC implementation of [[LEvents]] */
+class JDBCLEvents(
+ client: String,
+ config: StorageClientConfig,
+ namespace: String) extends LEvents with Logging {
+ implicit private val formats = org.json4s.DefaultFormats
+
+ def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+
+ // To use index, it must be varchar less than 255 characters on a VARCHAR column
+ val useIndex = config.properties.contains("INDEX") &&
+ config.properties("INDEX").equalsIgnoreCase("enabled")
+
+ val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+ val entityIdIndexName = s"idx_${tableName}_ei"
+ val entityTypeIndexName = s"idx_${tableName}_et"
+ DB autoCommit { implicit session =>
+ if (useIndex) {
+ SQL(s"""
+ create table if not exists $tableName (
+ id varchar(32) not null primary key,
+ event varchar(255) not null,
+ entityType varchar(255) not null,
+ entityId varchar(255) not null,
+ targetEntityType text,
+ targetEntityId text,
+ properties text,
+ eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ eventTimeZone varchar(50) not null,
+ tags text,
+ prId text,
+ creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ creationTimeZone varchar(50) not null)""").execute().apply()
+
+ // create index
+ SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply()
+ SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply()
+ } else {
+ SQL(s"""
+ create table if not exists $tableName (
+ id varchar(32) not null primary key,
+ event text not null,
+ entityType text not null,
+ entityId text not null,
+ targetEntityType text,
+ targetEntityId text,
+ properties text,
+ eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ eventTimeZone varchar(50) not null,
+ tags text,
+ prId text,
+ creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
+ creationTimeZone varchar(50) not null)""").execute().apply()
+ }
+ true
+ }
+ }
+
+ def remove(appId: Int, channelId: Option[Int] = None): Boolean =
+ DB autoCommit { implicit session =>
+ SQL(s"""
+ drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+ """).execute().apply()
+ true
+ }
+
+ def close(): Unit = ConnectionPool.closeAll()
+
+ def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
+ implicit ec: ExecutionContext): Future[String] = Future {
+ DB localTx { implicit session =>
+ val id = event.eventId.getOrElse(JDBCUtils.generateId)
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ sql"""
+ insert into $tableName values(
+ $id,
+ ${event.event},
+ ${event.entityType},
+ ${event.entityId},
+ ${event.targetEntityType},
+ ${event.targetEntityId},
+ ${write(event.properties.toJObject)},
+ ${event.eventTime},
+ ${event.eventTime.getZone.getID},
+ ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None},
+ ${event.prId},
+ ${event.creationTime},
+ ${event.creationTime.getZone.getID}
+ )
+ """.update().apply()
+ id
+ }
+ }
+
+ def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
+ implicit ec: ExecutionContext): Future[Option[Event]] = Future {
+ DB readOnly { implicit session =>
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ sql"""
+ select
+ id,
+ event,
+ entityType,
+ entityId,
+ targetEntityType,
+ targetEntityId,
+ properties,
+ eventTime,
+ eventTimeZone,
+ tags,
+ prId,
+ creationTime,
+ creationTimeZone
+ from $tableName
+ where id = $eventId
+ """.map(resultToEvent).single().apply()
+ }
+ }
+
+ def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
+ implicit ec: ExecutionContext): Future[Boolean] = Future {
+ DB localTx { implicit session =>
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ sql"""
+ delete from $tableName where id = $eventId
+ """.update().apply()
+ true
+ }
+ }
+
+ def futureFind(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ limit: Option[Int] = None,
+ reversed: Option[Boolean] = None
+ )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future {
+ DB readOnly { implicit session =>
+ val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
+ val whereClause = sqls.toAndConditionOpt(
+ startTime.map(x => sqls"eventTime >= $x"),
+ untilTime.map(x => sqls"eventTime < $x"),
+ entityType.map(x => sqls"entityType = $x"),
+ entityId.map(x => sqls"entityId = $x"),
+ eventNames.map(x =>
+ sqls.toOrConditionOpt(x.map(y =>
+ Some(sqls"event = $y")
+ ): _*)
+ ).getOrElse(None),
+ targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y")
+ .getOrElse(sqls"targetEntityType IS NULL")),
+ targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y")
+ .getOrElse(sqls"targetEntityId IS NULL"))
+ ).map(sqls.where(_)).getOrElse(sqls"")
+ val orderByClause = reversed.map(x =>
+ if (x) sqls"eventTime desc" else sqls"eventTime asc"
+ ).getOrElse(sqls"eventTime asc")
+ val limitClause = limit.map(x =>
+ if (x < 0) sqls"" else sqls.limit(x)
+ ).getOrElse(sqls"")
+ val q = sql"""
+ select
+ id,
+ event,
+ entityType,
+ entityId,
+ targetEntityType,
+ targetEntityId,
+ properties,
+ eventTime,
+ eventTimeZone,
+ tags,
+ prId,
+ creationTime,
+ creationTimeZone
+ from $tableName
+ $whereClause
+ order by $orderByClause
+ $limitClause
+ """
+ q.map(resultToEvent).list().apply().toIterator
+ }
+ }
+
+ private[prediction] def resultToEvent(rs: WrappedResultSet): Event = {
+ Event(
+ eventId = rs.stringOpt("id"),
+ event = rs.string("event"),
+ entityType = rs.string("entityType"),
+ entityId = rs.string("entityId"),
+ targetEntityType = rs.stringOpt("targetEntityType"),
+ targetEntityId = rs.stringOpt("targetEntityId"),
+ properties = rs.stringOpt("properties").map(p =>
+ DataMap(read[JObject](p))).getOrElse(DataMap()),
+ eventTime = new DateTime(rs.jodaDateTime("eventTime"),
+ DateTimeZone.forID(rs.string("eventTimeZone"))),
+ tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil),
+ prId = rs.stringOpt("prId"),
+ creationTime = new DateTime(rs.jodaDateTime("creationTime"),
+ DateTimeZone.forID(rs.string("creationTimeZone")))
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
new file mode 100644
index 0000000..01ed6ca
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+import scalikejdbc._
+
+/** JDBC implementation of [[Models]] */
+class JDBCModels(client: String, config: StorageClientConfig, prefix: String)
+ extends Models with Logging {
+ /** Database table name for this data access object */
+ val tableName = JDBCUtils.prefixTableName(prefix, "models")
+
+ /** Determines binary column type based on JDBC driver type */
+ val binaryColumnType = JDBCUtils.binaryColumnType(client)
+ DB autoCommit { implicit session =>
+ sql"""
+ create table if not exists $tableName (
+ id varchar(100) not null primary key,
+ models $binaryColumnType not null)""".execute().apply()
+ }
+
+ def insert(i: Model): Unit = DB localTx { implicit session =>
+ sql"insert into $tableName values(${i.id}, ${i.models})".update().apply()
+ }
+
+ def get(id: String): Option[Model] = DB readOnly { implicit session =>
+ sql"select id, models from $tableName where id = $id".map { r =>
+ Model(id = r.string("id"), models = r.bytes("models"))
+ }.single().apply()
+ }
+
+ def delete(id: String): Unit = DB localTx { implicit session =>
+ sql"delete from $tableName where id = $id".execute().apply()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
new file mode 100644
index 0000000..c01989c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -0,0 +1,160 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import java.sql.{DriverManager, ResultSet}
+
+import com.github.nscala_time.time.Imports._
+import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{JdbcRDD, RDD}
+import org.apache.spark.sql.{SQLContext, SaveMode}
+import org.json4s.JObject
+import org.json4s.native.Serialization
+
+/** JDBC implementation of [[PEvents]] */
+class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String) extends PEvents {
+ @transient private implicit lazy val formats = org.json4s.DefaultFormats
+ def find(
+ appId: Int,
+ channelId: Option[Int] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = {
+ val lower = startTime.map(_.getMillis).getOrElse(0.toLong)
+ /** Change the default upper bound from +100 to +1 year because MySQL's
+ * FROM_UNIXTIME(t) will return NULL if we use +100 years.
+ */
+ val upper = untilTime.map(_.getMillis).getOrElse((DateTime.now + 1.years).getMillis)
+ val par = scala.math.min(
+ new Duration(upper - lower).getStandardDays,
+ config.properties.getOrElse("PARTITIONS", "4").toLong).toInt
+ val entityTypeClause = entityType.map(x => s"and entityType = '$x'").getOrElse("")
+ val entityIdClause = entityId.map(x => s"and entityId = '$x'").getOrElse("")
+ val eventNamesClause =
+ eventNames.map("and (" + _.map(y => s"event = '$y'").mkString(" or ") + ")").getOrElse("")
+ val targetEntityTypeClause = targetEntityType.map(
+ _.map(x => s"and targetEntityType = '$x'"
+ ).getOrElse("and targetEntityType is null")).getOrElse("")
+ val targetEntityIdClause = targetEntityId.map(
+ _.map(x => s"and targetEntityId = '$x'"
+ ).getOrElse("and targetEntityId is null")).getOrElse("")
+ val q = s"""
+ select
+ id,
+ event,
+ entityType,
+ entityId,
+ targetEntityType,
+ targetEntityId,
+ properties,
+ eventTime,
+ eventTimeZone,
+ tags,
+ prId,
+ creationTime,
+ creationTimeZone
+ from ${JDBCUtils.eventTableName(namespace, appId, channelId)}
+ where
+ eventTime >= ${JDBCUtils.timestampFunction(client)}(?) and
+ eventTime < ${JDBCUtils.timestampFunction(client)}(?)
+ $entityTypeClause
+ $entityIdClause
+ $eventNamesClause
+ $targetEntityTypeClause
+ $targetEntityIdClause
+ """.replace("\n", " ")
+ new JdbcRDD(
+ sc,
+ () => {
+ DriverManager.getConnection(
+ client,
+ config.properties("USERNAME"),
+ config.properties("PASSWORD"))
+ },
+ q,
+ lower / 1000,
+ upper / 1000,
+ par,
+ (r: ResultSet) => {
+ Event(
+ eventId = Option(r.getString("id")),
+ event = r.getString("event"),
+ entityType = r.getString("entityType"),
+ entityId = r.getString("entityId"),
+ targetEntityType = Option(r.getString("targetEntityType")),
+ targetEntityId = Option(r.getString("targetEntityId")),
+ properties = Option(r.getString("properties")).map(x =>
+ DataMap(Serialization.read[JObject](x))).getOrElse(DataMap()),
+ eventTime = new DateTime(r.getTimestamp("eventTime").getTime,
+ DateTimeZone.forID(r.getString("eventTimeZone"))),
+ tags = Option(r.getString("tags")).map(x =>
+ x.split(",").toList).getOrElse(Nil),
+ prId = Option(r.getString("prId")),
+ creationTime = new DateTime(r.getTimestamp("creationTime").getTime,
+ DateTimeZone.forID(r.getString("creationTimeZone"))))
+ }).cache()
+ }
+
+ def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+ val sqlContext = new SQLContext(sc)
+
+ import sqlContext.implicits._
+
+ val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
+
+ val eventTableColumns = Seq[String](
+ "id"
+ , "event"
+ , "entityType"
+ , "entityId"
+ , "targetEntityType"
+ , "targetEntityId"
+ , "properties"
+ , "eventTime"
+ , "eventTimeZone"
+ , "tags"
+ , "prId"
+ , "creationTime"
+ , "creationTimeZone")
+
+ val eventDF = events.map { event =>
+ (event.eventId.getOrElse(JDBCUtils.generateId)
+ , event.event
+ , event.entityType
+ , event.entityId
+ , event.targetEntityType.orNull
+ , event.targetEntityId.orNull
+ , if (!event.properties.isEmpty) Serialization.write(event.properties.toJObject) else null
+ , new java.sql.Timestamp(event.eventTime.getMillis)
+ , event.eventTime.getZone.getID
+ , if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else null
+ , event.prId
+ , new java.sql.Timestamp(event.creationTime.getMillis)
+ , event.creationTime.getZone.getID)
+ }.toDF(eventTableColumns:_*)
+
+ // spark version 1.4.0 or higher
+ val prop = new java.util.Properties
+ prop.setProperty("user", config.properties("USERNAME"))
+ prop.setProperty("password", config.properties("PASSWORD"))
+ eventDF.write.mode(SaveMode.Append).jdbc(client, tableName, prop)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
new file mode 100644
index 0000000..e95b49b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala
@@ -0,0 +1,103 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import scalikejdbc._
+
+/** JDBC related utilities */
+object JDBCUtils {
+ /** Extract JDBC driver type from URL
+ *
+ * @param url JDBC URL
+ * @return The driver type, e.g. postgresql
+ */
+ def driverType(url: String): String = {
+ val capture = """jdbc:([^:]+):""".r
+ capture findFirstIn url match {
+ case Some(capture(driverType)) => driverType
+ case None => ""
+ }
+ }
+
+ /** Determines binary column type from JDBC URL
+ *
+ * @param url JDBC URL
+ * @return Binary column type as SQLSyntax, e.g. LONGBLOB
+ */
+ def binaryColumnType(url: String): SQLSyntax = {
+ driverType(url) match {
+ case "postgresql" => sqls"bytea"
+ case "mysql" => sqls"longblob"
+ case _ => sqls"longblob"
+ }
+ }
+
+ /** Determines UNIX timestamp conversion function from JDBC URL
+ *
+ * @param url JDBC URL
+ * @return Timestamp conversion function, e.g. TO_TIMESTAMP
+ */
+ def timestampFunction(url: String): String = {
+ driverType(url) match {
+ case "postgresql" => "to_timestamp"
+ case "mysql" => "from_unixtime"
+ case _ => "from_unixtime"
+ }
+ }
+
+ /** Converts Map of String to String to comma-separated list of key=value
+ *
+ * @param m Map of String to String
+ * @return Comma-separated list, e.g. FOO=BAR,X=Y,...
+ */
+ def mapToString(m: Map[String, String]): String = {
+ m.map(t => s"${t._1}=${t._2}").mkString(",")
+ }
+
+ /** Inverse of mapToString
+ *
+ * @param str Comma-separated list, e.g. FOO=BAR,X=Y,...
+ * @return Map of String to String, e.g. Map("FOO" -> "BAR", "X" -> "Y", ...)
+ */
+ def stringToMap(str: String): Map[String, String] = {
+ str.split(",").map { x =>
+ val y = x.split("=")
+ y(0) -> y(1)
+ }.toMap[String, String]
+ }
+
+ /** Generate 32-character random ID using UUID with - stripped */
+ def generateId: String = java.util.UUID.randomUUID().toString.replace("-", "")
+
+ /** Prefix a table name
+ *
+ * @param prefix Table prefix
+ * @param table Table name
+ * @return Prefixed table name
+ */
+ def prefixTableName(prefix: String, table: String): SQLSyntax =
+ sqls.createUnsafely(s"${prefix}_$table")
+
+ /** Derive event table name
+ *
+ * @param namespace Namespace of event tables
+ * @param appId App ID
+ * @param channelId Optional channel ID
+ * @return Full event table name
+ */
+ def eventTableName(namespace: String, appId: Int, channelId: Option[Int]): String =
+ s"${namespace}_${appId}${channelId.map("_" + _).getOrElse("")}"
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
new file mode 100644
index 0000000..6015870
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala
@@ -0,0 +1,50 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.jdbc
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import scalikejdbc._
+
+/** JDBC implementation of [[BaseStorageClient]] */
+class StorageClient(val config: StorageClientConfig)
+ extends BaseStorageClient with Logging {
+ override val prefix = "JDBC"
+
+ if (!config.properties.contains("URL")) {
+ throw new StorageClientException("The URL variable is not set!", null)
+ }
+ if (!config.properties.contains("USERNAME")) {
+ throw new StorageClientException("The USERNAME variable is not set!", null)
+ }
+ if (!config.properties.contains("PASSWORD")) {
+ throw new StorageClientException("The PASSWORD variable is not set!", null)
+ }
+
+ // set max size of connection pool
+ val maxSize: Int = config.properties.getOrElse("CONNECTIONS", "8").toInt
+ val settings = ConnectionPoolSettings(maxSize = maxSize)
+
+ ConnectionPool.singleton(
+ config.properties("URL"),
+ config.properties("USERNAME"),
+ config.properties("PASSWORD"),
+ settings)
+ /** JDBC connection URL. Connections are managed by ScalikeJDBC. */
+ val client = config.properties("URL")
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
new file mode 100644
index 0000000..c423b29
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala
@@ -0,0 +1,23 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+/** JDBC implementation of storage traits, supporting meta data, event data, and
+ * model data
+ *
+ * @group Implementation
+ */
+package object jdbc {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
new file mode 100644
index 0000000..82989aa
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala
@@ -0,0 +1,59 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.localfs
+
+import java.io.File
+import java.io.FileNotFoundException
+import java.io.FileOutputStream
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.Model
+import org.apache.predictionio.data.storage.Models
+import org.apache.predictionio.data.storage.StorageClientConfig
+
+import scala.io.Source
+
+class LocalFSModels(f: File, config: StorageClientConfig, prefix: String)
+ extends Models with Logging {
+
+ def insert(i: Model): Unit = {
+ try {
+ val fos = new FileOutputStream(new File(f, s"${prefix}${i.id}"))
+ fos.write(i.models)
+ fos.close
+ } catch {
+ case e: FileNotFoundException => error(e.getMessage)
+ }
+ }
+
+ def get(id: String): Option[Model] = {
+ try {
+ Some(Model(
+ id = id,
+ models = Source.fromFile(new File(f, s"${prefix}${id}"))(
+ scala.io.Codec.ISO8859).map(_.toByte).toArray))
+ } catch {
+ case e: Throwable =>
+ error(e.getMessage)
+ None
+ }
+ }
+
+ def delete(id: String): Unit = {
+ val m = new File(f, s"${prefix}${id}")
+ if (!m.delete) error(s"Unable to delete ${m.getCanonicalPath}!")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala
new file mode 100644
index 0000000..8206384
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala
@@ -0,0 +1,43 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.localfs
+
+import java.io.File
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+ with Logging {
+ override val prefix = "LocalFS"
+ val f = new File(
+ config.properties.getOrElse("PATH", config.properties("HOSTS")))
+ if (f.exists) {
+ if (!f.isDirectory) throw new StorageClientException(
+ s"${f} already exists but it is not a directory!",
+ null)
+ if (!f.canWrite) throw new StorageClientException(
+ s"${f} already exists but it is not writable!",
+ null)
+ } else {
+ if (!f.mkdirs) throw new StorageClientException(
+ s"${f} does not exist and automatic creation failed!",
+ null)
+ }
+ val client = f
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala
new file mode 100644
index 0000000..f245a06
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala
@@ -0,0 +1,22 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+/** Local file system implementation of storage traits, supporting model data only
+ *
+ * @group Implementation
+ */
+package object localfs {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/package.scala
new file mode 100644
index 0000000..09e6fa3
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/package.scala
@@ -0,0 +1,26 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data
+
+/** If you are an engine developer, please refer to the [[store]] package.
+ *
+ * This package provides convenient access to underlying data access objects.
+ * The common entry point is [[Storage]].
+ *
+ * Developer APIs are available to advanced developers to add support of other
+ * data store backends.
+ */
+package object storage {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/Common.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/Common.scala b/data/src/main/scala/org/apache/predictionio/data/store/Common.scala
new file mode 100644
index 0000000..81b4b28
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/Common.scala
@@ -0,0 +1,50 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.store
+
+import org.apache.predictionio.data.storage.Storage
+import grizzled.slf4j.Logger
+
+private[prediction] object Common {
+
+ @transient lazy val logger = Logger[this.type]
+ @transient lazy private val appsDb = Storage.getMetaDataApps()
+ @transient lazy private val channelsDb = Storage.getMetaDataChannels()
+
+ /* throw exception if invalid app name or channel name */
+ def appNameToId(appName: String, channelName: Option[String]): (Int, Option[Int]) = {
+ val appOpt = appsDb.getByName(appName)
+
+ appOpt.map { app =>
+ val channelMap: Map[String, Int] = channelsDb.getByAppid(app.id)
+ .map(c => (c.name, c.id)).toMap
+
+ val channelId: Option[Int] = channelName.map { ch =>
+ if (channelMap.contains(ch)) {
+ channelMap(ch)
+ } else {
+ logger.error(s"Invalid channel name ${ch}.")
+ throw new IllegalArgumentException(s"Invalid channel name ${ch}.")
+ }
+ }
+
+ (app.id, channelId)
+ }.getOrElse {
+ logger.error(s"Invalid app name ${appName}")
+ throw new IllegalArgumentException(s"Invalid app name ${appName}")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
new file mode 100644
index 0000000..ae38e7b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
@@ -0,0 +1,142 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.store
+
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.storage.Event
+
+import org.joda.time.DateTime
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+
+/** This object provides a set of operation to access Event Store
+ * without going through Spark's parallelization
+ */
+object LEventStore {
+
+ private val defaultTimeout = Duration(60, "seconds")
+
+ @transient lazy private val eventsDb = Storage.getLEvents()
+
+ /** Reads events of the specified entity. May use this in Algorithm's predict()
+ * or Serving logic to have fast event store access.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @param latest Return latest event first (default true)
+ * @return Iterator[Event]
+ */
+ def findByEntity(
+ appName: String,
+ entityType: String,
+ entityId: String,
+ channelName: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ limit: Option[Int] = None,
+ latest: Boolean = true,
+ timeout: Duration = defaultTimeout): Iterator[Event] = {
+
+ val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+ Await.result(eventsDb.futureFind(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = Some(entityType),
+ entityId = Some(entityId),
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ limit = limit,
+ reversed = Some(latest)),
+ timeout)
+ }
+
+ /** Reads events generically. If entityType or entityId is not specified, it
+ * results in table scan.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * - None means no restriction on entityType
+ * - Some(x) means entityType should match x.
+ * @param entityId return events of this entityId
+ * - None means no restriction on entityId
+ * - Some(x) means entityId should match x.
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @return Iterator[Event]
+ */
+ def find(
+ appName: String,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ channelName: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ limit: Option[Int] = None,
+ timeout: Duration = defaultTimeout): Iterator[Event] = {
+
+ val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+ Await.result(eventsDb.futureFind(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ limit = limit), timeout)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala
new file mode 100644
index 0000000..b8f0037
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala
@@ -0,0 +1,116 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.store
+
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PropertyMap
+
+import org.joda.time.DateTime
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+/** This object provides a set of operation to access Event Store
+ * with Spark's parallelization
+ */
+object PEventStore {
+
+ @transient lazy private val eventsDb = Storage.getPEvents()
+
+ /** Read events from Event Store
+ *
+ * @param appName return events of this app
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param sc Spark context
+ * @return RDD[Event]
+ */
+ def find(
+ appName: String,
+ channelName: Option[String] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None
+ )(sc: SparkContext): RDD[Event] = {
+
+ val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+ eventsDb.find(
+ appId = appId,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = entityType,
+ entityId = entityId,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId
+ )(sc)
+
+ }
+
+ /** Aggregate properties of entities based on these special events:
+ * \$set, \$unset, \$delete events.
+ *
+ * @param appName use events of this app
+ * @param entityType aggregate properties of the entities of this entityType
+ * @param channelName use events of this channel (default channel if it's None)
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param required only keep entities with these required properties defined
+ * @param sc Spark context
+ * @return RDD[(String, PropertyMap)] RDD of entityId and PropetyMap pair
+ */
+ def aggregateProperties(
+ appName: String,
+ entityType: String,
+ channelName: Option[String] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ required: Option[Seq[String]] = None)
+ (sc: SparkContext): RDD[(String, PropertyMap)] = {
+
+ val (appId, channelId) = Common.appNameToId(appName, channelName)
+
+ eventsDb.aggregateProperties(
+ appId = appId,
+ entityType = entityType,
+ channelId = channelId,
+ startTime = startTime,
+ untilTime = untilTime,
+ required = required
+ )(sc)
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
new file mode 100644
index 0000000..fa14daf
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
@@ -0,0 +1,142 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.store.java
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.LEventStore
+import org.joda.time.DateTime
+
+import scala.collection.JavaConversions
+import scala.concurrent.duration.Duration
+
+/** This Java-friendly object provides a set of operation to access Event Store
+ * without going through Spark's parallelization
+ */
+object LJavaEventStore {
+
+ /** Reads events of the specified entity. May use this in Algorithm's predict()
+ * or Serving logic to have fast event store access.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @param latest Return latest event first
+ * @return java.util.List[Event]
+ */
+ def findByEntity(
+ appName: String,
+ entityType: String,
+ entityId: String,
+ channelName: Option[String],
+ eventNames: Option[java.util.List[String]],
+ targetEntityType: Option[Option[String]],
+ targetEntityId: Option[Option[String]],
+ startTime: Option[DateTime],
+ untilTime: Option[DateTime],
+ limit: Option[Integer],
+ latest: Boolean,
+ timeout: Duration): java.util.List[Event] = {
+
+ val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+ val limitInt = limit.map(_.intValue())
+
+ JavaConversions.seqAsJavaList(
+ LEventStore.findByEntity(
+ appName,
+ entityType,
+ entityId,
+ channelName,
+ eventNamesSeq,
+ targetEntityType,
+ targetEntityId,
+ startTime,
+ untilTime,
+ limitInt,
+ latest,
+ timeout
+ ).toSeq)
+ }
+
+ /** Reads events generically. If entityType or entityId is not specified, it
+ * results in table scan.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * - None means no restriction on entityType
+ * - Some(x) means entityType should match x.
+ * @param entityId return events of this entityId
+ * - None means no restriction on entityId
+ * - Some(x) means entityId should match x.
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @return java.util.List[Event]
+ */
+ def find(
+ appName: String,
+ entityType: Option[String],
+ entityId: Option[String],
+ channelName: Option[String],
+ eventNames: Option[java.util.List[String]],
+ targetEntityType: Option[Option[String]],
+ targetEntityId: Option[Option[String]],
+ startTime: Option[DateTime],
+ untilTime: Option[DateTime],
+ limit: Option[Integer],
+ timeout: Duration): java.util.List[Event] = {
+
+ val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+ val limitInt = limit.map(_.intValue())
+
+ JavaConversions.seqAsJavaList(
+ LEventStore.find(
+ appName,
+ entityType,
+ entityId,
+ channelName,
+ eventNamesSeq,
+ targetEntityType,
+ targetEntityId,
+ startTime,
+ untilTime,
+ limitInt,
+ timeout
+ ).toSeq)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala
new file mode 100644
index 0000000..b6d174b
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala
@@ -0,0 +1,29 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.store.java
+
+/** Used by Java-based engines to mock Some and None */
+object OptionHelper {
+ /** Mimics a None from Java-based engine */
+ def none[T]: Option[T] = {
+ Option(null.asInstanceOf[T])
+ }
+
+ /** Mimics a Some from Java-based engine */
+ def some[T](value: T): Option[T] = {
+ Some(value)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala
new file mode 100644
index 0000000..c47032c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala
@@ -0,0 +1,109 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.store.java
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PropertyMap
+import org.apache.predictionio.data.store.PEventStore
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaRDD
+import org.joda.time.DateTime
+
+import scala.collection.JavaConversions
+
+/** This Java-friendly object provides a set of operation to access Event Store
+ * with Spark's parallelization
+ */
+object PJavaEventStore {
+
+ /** Read events from Event Store
+ *
+ * @param appName return events of this app
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param sc Spark context
+ * @return JavaRDD[Event]
+ */
+ def find(
+ appName: String,
+ channelName: Option[String],
+ startTime: Option[DateTime],
+ untilTime: Option[DateTime],
+ entityType: Option[String],
+ entityId: Option[String],
+ eventNames: Option[java.util.List[String]],
+ targetEntityType: Option[Option[String]],
+ targetEntityId: Option[Option[String]],
+ sc: SparkContext): JavaRDD[Event] = {
+
+ val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+
+ PEventStore.find(
+ appName,
+ channelName,
+ startTime,
+ untilTime,
+ entityType,
+ entityId,
+ eventNamesSeq,
+ targetEntityType,
+ targetEntityId
+ )(sc)
+ }
+
+ /** Aggregate properties of entities based on these special events:
+ * \$set, \$unset, \$delete events.
+ *
+ * @param appName use events of this app
+ * @param entityType aggregate properties of the entities of this entityType
+ * @param channelName use events of this channel (default channel if it's None)
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param required only keep entities with these required properties defined
+ * @param sc Spark context
+ * @return JavaRDD[(String, PropertyMap)] JavaRDD of entityId and PropetyMap pair
+ */
+ def aggregateProperties(
+ appName: String,
+ entityType: String,
+ channelName: Option[String],
+ startTime: Option[DateTime],
+ untilTime: Option[DateTime],
+ required: Option[java.util.List[String]],
+ sc: SparkContext): JavaRDD[(String, PropertyMap)] = {
+
+ PEventStore.aggregateProperties(
+ appName,
+ entityType,
+ channelName,
+ startTime,
+ untilTime
+ )(sc)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/package.scala b/data/src/main/scala/org/apache/predictionio/data/store/package.scala
new file mode 100644
index 0000000..36c592f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/package.scala
@@ -0,0 +1,21 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data
+
+/** Provides high level interfaces to the Event Store from within a prediction
+ * engine.
+ */
+package object store {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
new file mode 100644
index 0000000..31937d5
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
@@ -0,0 +1,110 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.annotation.Experimental
+import org.apache.predictionio.data.storage.Event
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.SQLContext
+import org.joda.time.DateTime
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe._
+import scala.util.hashing.MurmurHash3
+
+/**
+ * :: Experimental ::
+ */
+@Experimental
+object DataView {
+ /**
+ * :: Experimental ::
+ *
+ * Create a DataFrame from events of a specified app.
+ *
+ * @param appName return events of this app
+ * @param channelName use events of this channel (default channel if it's None)
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param conversionFunction a function that turns raw Events into events of interest.
+ * If conversionFunction returns None, such events are dropped.
+ * @param name identify the DataFrame created
+ * @param version used to track changes to the conversionFunction, e.g. version = "20150413"
+ * and update whenever the function is changed.
+ * @param sqlContext SQL context
+ * @tparam E the output type of the conversion function. The type needs to extend Product
+ * (e.g. case class)
+ * @return a DataFrame of events
+ */
+ @Experimental
+ def create[E <: Product: TypeTag: ClassTag](
+ appName: String,
+ channelName: Option[String] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ conversionFunction: Event => Option[E],
+ name: String = "",
+ version: String = "")(sqlContext: SQLContext): DataFrame = {
+
+ @transient lazy val logger = Logger[this.type]
+
+ val sc = sqlContext.sparkContext
+
+ val beginTime = startTime match {
+ case Some(t) => t
+ case None => new DateTime(0L)
+ }
+ val endTime = untilTime match {
+ case Some(t) => t
+ case None => DateTime.now() // fix the current time
+ }
+ // detect changes to the case class
+ val uid = java.io.ObjectStreamClass.lookup(implicitly[reflect.ClassTag[E]].runtimeClass)
+ .getSerialVersionUID
+ val hash = MurmurHash3.stringHash(s"$beginTime-$endTime-$version-$uid")
+ val baseDir = s"${sys.env("PIO_FS_BASEDIR")}/view"
+ val fileName = s"$baseDir/$name-$appName-$hash.parquet"
+ try {
+ sqlContext.parquetFile(fileName)
+ } catch {
+ case e: java.io.FileNotFoundException =>
+ logger.info("Cached copy not found, reading from DB.")
+ // if cached copy is found, use it. If not, grab from Storage
+ val result: RDD[E] = PEventStore.find(
+ appName = appName,
+ channelName = channelName,
+ startTime = startTime,
+ untilTime = Some(endTime))(sc)
+ .flatMap((e) => conversionFunction(e))
+ import sqlContext.implicits._ // needed for RDD.toDF()
+ val resultDF = result.toDF()
+
+ resultDF.saveAsParquetFile(fileName)
+ sqlContext.parquetFile(fileName)
+ case e: java.lang.RuntimeException =>
+ if (e.toString.contains("is not a Parquet file")) {
+ logger.error(s"$fileName does not contain a valid Parquet file. " +
+ "Please delete it and try again.")
+ }
+ throw e
+ }
+ }
+}