You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/06/15 07:34:19 UTC
[3/7] incubator-s2graph git commit: add dataframe cache option
add dataframe cache option
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d93e5de4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d93e5de4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d93e5de4
Branch: refs/heads/master
Commit: d93e5de4a89d5fbf195acb09d467b103f564a439
Parents: 63727f3
Author: Chul Kang <el...@apache.org>
Authored: Fri Jun 8 00:07:50 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Fri Jun 8 00:07:50 2018 +0900
----------------------------------------------------------------------
.../src/main/scala/org/apache/s2graph/s2jobs/Job.scala | 11 +++++++++--
.../main/scala/org/apache/s2graph/s2jobs/task/Task.scala | 3 +--
2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d93e5de4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 717d087..8f21bc2 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -29,7 +29,12 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log
def run() = {
// source
- jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, source.toDF(ss))}
+ jobDesc.sources.foreach{ source =>
+ val df = source.toDF(ss)
+ if (source.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+
+ dfMap.put(source.conf.name, df)
+ }
logger.debug(s"valid source DF set : ${dfMap.keySet}")
// process
@@ -64,7 +69,9 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log
}
.map { p =>
val inputMap = p.conf.inputs.map{ input => (input, dfMap(input)) }.toMap
- p.conf.name -> p.execute(ss, inputMap)
+ val df = p.execute(ss, inputMap)
+ if (p.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+ p.conf.name -> df
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d93e5de4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ea42828..1210132 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -42,8 +42,7 @@ object TaskConf {
taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
}
}
-
-case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, options: Map[String, String] = Map.empty)
+case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None)
trait Task extends Serializable with Logger {
val conf: TaskConf