You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2018/10/04 06:36:46 UTC
predictionio git commit: [PIO-176] Hack in build.sbt for switching
between Spark 1.x and 2.x should be cleaned up. [Forced Update!]
Repository: predictionio
Updated Branches:
refs/heads/develop b3fba2eac -> 02377e8dc (forced update)
[PIO-176] Hack in build.sbt for switching between Spark 1.x and 2.x should be cleaned up.
Closes #477
Project: http://git-wip-us.apache.org/repos/asf/predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/predictionio/commit/02377e8d
Tree: http://git-wip-us.apache.org/repos/asf/predictionio/tree/02377e8d
Diff: http://git-wip-us.apache.org/repos/asf/predictionio/diff/02377e8d
Branch: refs/heads/develop
Commit: 02377e8dc8bb7a564b9c63ff58b4ca7b0b90c3bf
Parents: 0193c8b
Author: saurabh gulati <sa...@gmail.com>
Authored: Wed Oct 3 23:32:27 2018 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Wed Oct 3 23:36:23 2018 -0700
----------------------------------------------------------------------
build.sbt | 2 -
.../data/store/python/PPythonEventStore.scala | 146 +++++++++++++++++++
.../predictionio/data/view/DataView.scala | 9 +-
.../data/SparkVersionDependent.scala | 30 ----
.../data/store/python/PPythonEventStore.scala | 146 -------------------
.../data/storage/jdbc/JDBCPEvents.scala | 8 +-
.../tools/export/EventsToFile.scala | 6 +-
7 files changed, 153 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 9eea3e0..533fbef 100644
--- a/build.sbt
+++ b/build.sbt
@@ -129,8 +129,6 @@ val data = (project in file("data")).
settings(commonSettings: _*).
settings(commonTestSettings: _*).
enablePlugins(GenJavadocPlugin).
- settings(unmanagedSourceDirectories in Compile +=
- sourceDirectory.value / s"main/spark-${majorVersion(sparkVersion.value)}").
disablePlugins(sbtassembly.AssemblyPlugin)
val core = (project in file("core")).
http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
new file mode 100644
index 0000000..1d03634
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.predictionio.data.store.python
+
+import java.sql.Timestamp
+
+import org.apache.predictionio.data.store.PEventStore
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.joda.time.DateTime
+
+
+/** This object provides a set of operation to access Event Store
+ * with Spark's parallelization
+ */
+object PPythonEventStore {
+
+
+ /** Read events from Event Store
+ *
+ * @param appName return events of this app
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param spark Spark context
+ * @return DataFrame
+ */
+ def find(
+ appName: String,
+ channelName: String,
+ startTime: Timestamp,
+ untilTime: Timestamp,
+ entityType: String,
+ entityId: String,
+ eventNames: Array[String],
+ targetEntityType: String,
+ targetEntityId: String
+ )(spark: SparkSession): DataFrame = {
+ import spark.implicits._
+ val colNames: Seq[String] =
+ Seq(
+ "eventId",
+ "event",
+ "entityType",
+ "entityId",
+ "targetEntityType",
+ "targetEntityId",
+ "eventTime",
+ "tags",
+ "prId",
+ "creationTime",
+ "fields"
+ )
+ PEventStore.find(appName,
+ Option(channelName),
+ Option(startTime).map(t => new DateTime(t.getTime)),
+ Option(untilTime).map(t => new DateTime(t.getTime)),
+ Option(entityType),
+ Option(entityId),
+ Option(eventNames),
+ Option(Option(targetEntityType)),
+ Option(Option(targetEntityId)))(spark.sparkContext).map { e =>
+ (
+ e.eventId,
+ e.event,
+ e.entityType,
+ e.entityId,
+ e.targetEntityType.orNull,
+ e.targetEntityId.orNull,
+ new Timestamp(e.eventTime.getMillis),
+ e.tags.mkString("\t"),
+ e.prId.orNull,
+ new Timestamp(e.creationTime.getMillis),
+ e.properties.fields.mapValues(_.values.toString)
+ )
+ }.toDF(colNames: _*)
+ }
+
+ /** Aggregate properties of entities based on these special events:
+ * \$set, \$unset, \$delete events.
+ *
+ * @param appName use events of this app
+ * @param entityType aggregate properties of the entities of this entityType
+ * @param channelName use events of this channel (default channel if it's None)
+ * @param startTime use events with eventTime >= startTime
+ * @param untilTime use events with eventTime < untilTime
+ * @param required only keep entities with these required properties defined
+ * @param spark Spark session
+ * @return DataFrame DataFrame of entityId and PropetyMap pair
+ */
+ def aggregateProperties(
+ appName: String,
+ entityType: String,
+ channelName: String,
+ startTime: Timestamp,
+ untilTime: Timestamp,
+ required: Array[String]
+ )
+ (spark: SparkSession): DataFrame = {
+ import spark.implicits._
+ val colNames: Seq[String] =
+ Seq(
+ "entityId",
+ "firstUpdated",
+ "lastUpdated",
+ "fields"
+ )
+ PEventStore.aggregateProperties(appName,
+ entityType,
+ Option(channelName),
+ Option(startTime).map(t => new DateTime(t.getTime)),
+ Option(untilTime).map(t => new DateTime(t.getTime)),
+ Option(required.toSeq))(spark.sparkContext).map { x =>
+ val m = x._2
+ (x._1,
+ new Timestamp(m.firstUpdated.getMillis),
+ new Timestamp(m.lastUpdated.getMillis),
+ m.fields.mapValues(_.values.toString)
+ )
+ }.toDF(colNames: _*)
+ }
+}
http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
index 1c47e10..ca92e8f 100644
--- a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
@@ -20,14 +20,10 @@ package org.apache.predictionio.data.view
import org.apache.predictionio.annotation.Experimental
import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.SparkVersionDependent
-
import grizzled.slf4j.Logger
import org.apache.predictionio.data.store.PEventStore
-
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.SparkContext
import org.joda.time.DateTime
@@ -52,7 +48,6 @@ object DataView {
* @param name identify the DataFrame created
* @param version used to track changes to the conversionFunction, e.g. version = "20150413"
* and update whenever the function is changed.
- * @param sqlContext SQL context
* @tparam E the output type of the conversion function. The type needs to extend Product
* (e.g. case class)
* @return a DataFrame of events
@@ -69,7 +64,7 @@ object DataView {
@transient lazy val logger = Logger[this.type]
- val sqlSession = SparkVersionDependent.sqlSession(sc)
+ val sqlSession = SparkSession.builder().getOrCreate()
val beginTime = startTime match {
case Some(t) => t
http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
----------------------------------------------------------------------
diff --git a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala b/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
deleted file mode 100644
index 3d07bdf..0000000
--- a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SparkSession
-
-object SparkVersionDependent {
-
- def sqlSession(sc: SparkContext): SparkSession = {
- SparkSession.builder().getOrCreate()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala b/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
deleted file mode 100644
index 1d03634..0000000
--- a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.predictionio.data.store.python
-
-import java.sql.Timestamp
-
-import org.apache.predictionio.data.store.PEventStore
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.joda.time.DateTime
-
-
-/** This object provides a set of operation to access Event Store
- * with Spark's parallelization
- */
-object PPythonEventStore {
-
-
- /** Read events from Event Store
- *
- * @param appName return events of this app
- * @param channelName return events of this channel (default channel if it's None)
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param entityType return events of this entityType
- * @param entityId return events of this entityId
- * @param eventNames return events with any of these event names.
- * @param targetEntityType return events of this targetEntityType:
- * - None means no restriction on targetEntityType
- * - Some(None) means no targetEntityType for this event
- * - Some(Some(x)) means targetEntityType should match x.
- * @param targetEntityId return events of this targetEntityId
- * - None means no restriction on targetEntityId
- * - Some(None) means no targetEntityId for this event
- * - Some(Some(x)) means targetEntityId should match x.
- * @param spark Spark context
- * @return DataFrame
- */
- def find(
- appName: String,
- channelName: String,
- startTime: Timestamp,
- untilTime: Timestamp,
- entityType: String,
- entityId: String,
- eventNames: Array[String],
- targetEntityType: String,
- targetEntityId: String
- )(spark: SparkSession): DataFrame = {
- import spark.implicits._
- val colNames: Seq[String] =
- Seq(
- "eventId",
- "event",
- "entityType",
- "entityId",
- "targetEntityType",
- "targetEntityId",
- "eventTime",
- "tags",
- "prId",
- "creationTime",
- "fields"
- )
- PEventStore.find(appName,
- Option(channelName),
- Option(startTime).map(t => new DateTime(t.getTime)),
- Option(untilTime).map(t => new DateTime(t.getTime)),
- Option(entityType),
- Option(entityId),
- Option(eventNames),
- Option(Option(targetEntityType)),
- Option(Option(targetEntityId)))(spark.sparkContext).map { e =>
- (
- e.eventId,
- e.event,
- e.entityType,
- e.entityId,
- e.targetEntityType.orNull,
- e.targetEntityId.orNull,
- new Timestamp(e.eventTime.getMillis),
- e.tags.mkString("\t"),
- e.prId.orNull,
- new Timestamp(e.creationTime.getMillis),
- e.properties.fields.mapValues(_.values.toString)
- )
- }.toDF(colNames: _*)
- }
-
- /** Aggregate properties of entities based on these special events:
- * \$set, \$unset, \$delete events.
- *
- * @param appName use events of this app
- * @param entityType aggregate properties of the entities of this entityType
- * @param channelName use events of this channel (default channel if it's None)
- * @param startTime use events with eventTime >= startTime
- * @param untilTime use events with eventTime < untilTime
- * @param required only keep entities with these required properties defined
- * @param spark Spark session
- * @return DataFrame DataFrame of entityId and PropetyMap pair
- */
- def aggregateProperties(
- appName: String,
- entityType: String,
- channelName: String,
- startTime: Timestamp,
- untilTime: Timestamp,
- required: Array[String]
- )
- (spark: SparkSession): DataFrame = {
- import spark.implicits._
- val colNames: Seq[String] =
- Seq(
- "entityId",
- "firstUpdated",
- "lastUpdated",
- "fields"
- )
- PEventStore.aggregateProperties(appName,
- entityType,
- Option(channelName),
- Option(startTime).map(t => new DateTime(t.getTime)),
- Option(untilTime).map(t => new DateTime(t.getTime)),
- Option(required.toSeq))(spark.sparkContext).map { x =>
- val m = x._2
- (x._1,
- new Timestamp(m.firstUpdated.getMillis),
- new Timestamp(m.lastUpdated.getMillis),
- m.fields.mapValues(_.values.toString)
- )
- }.toDF(colNames: _*)
- }
-}
http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
index d31e592..4fa8b9f 100644
--- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -20,12 +20,10 @@ package org.apache.predictionio.data.storage.jdbc
import java.sql.{DriverManager, ResultSet}
import com.github.nscala_time.time.Imports._
-import org.apache.predictionio.data.storage.{
- DataMap, Event, PEvents, StorageClientConfig}
-import org.apache.predictionio.data.SparkVersionDependent
+import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{JdbcRDD, RDD}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{SaveMode, SparkSession}
import org.json4s.JObject
import org.json4s.native.Serialization
import scalikejdbc._
@@ -121,7 +119,7 @@ class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String
}
def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
- val sqlSession = SparkVersionDependent.sqlSession(sc)
+ val sqlSession = SparkSession.builder().getOrCreate()
import sqlSession.implicits._
val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
index 0372a44..9b6dbb5 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
@@ -21,14 +21,12 @@ package org.apache.predictionio.tools.export
import org.apache.predictionio.controller.Utils
import org.apache.predictionio.data.storage.EventJson4sSupport
import org.apache.predictionio.data.storage.Storage
-import org.apache.predictionio.data.SparkVersionDependent
import org.apache.predictionio.tools.Runner
import org.apache.predictionio.workflow.WorkflowContext
import org.apache.predictionio.workflow.WorkflowUtils
import org.apache.predictionio.workflow.CleanupFunctions
-
import grizzled.slf4j.Logging
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{SaveMode, SparkSession}
import org.json4s.native.Serialization._
case class EventsToFileArgs(
@@ -93,7 +91,7 @@ object EventsToFile extends Logging {
mode = "Export",
batch = "App ID " + args.appId + channelStr,
executorEnv = Runner.envStringToMap(args.env))
- val sqlSession = SparkVersionDependent.sqlSession(sc)
+ val sqlSession = SparkSession.builder().getOrCreate()
val events = Storage.getPEvents()
val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
val jsonStringRdd = eventsRdd.map(write(_))