You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/06/15 07:34:19 UTC

[3/7] incubator-s2graph git commit: add dataframe cache option

add dataframe cache option


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d93e5de4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d93e5de4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d93e5de4

Branch: refs/heads/master
Commit: d93e5de4a89d5fbf195acb09d467b103f564a439
Parents: 63727f3
Author: Chul Kang <el...@apache.org>
Authored: Fri Jun 8 00:07:50 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Fri Jun 8 00:07:50 2018 +0900

----------------------------------------------------------------------
 .../src/main/scala/org/apache/s2graph/s2jobs/Job.scala   | 11 +++++++++--
 .../main/scala/org/apache/s2graph/s2jobs/task/Task.scala |  3 +--
 2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d93e5de4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 717d087..8f21bc2 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -29,7 +29,12 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log
 
   def run() = {
     // source
-    jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, source.toDF(ss))}
+    jobDesc.sources.foreach{ source =>
+      val df = source.toDF(ss)
+      if (source.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+
+      dfMap.put(source.conf.name, df)
+    }
     logger.debug(s"valid source DF set : ${dfMap.keySet}")
 
     // process
@@ -64,7 +69,9 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log
     }
     .map { p =>
       val inputMap = p.conf.inputs.map{ input => (input,  dfMap(input)) }.toMap
-      p.conf.name -> p.execute(ss, inputMap)
+      val df = p.execute(ss, inputMap)
+      if (p.conf.cache.getOrElse(false) && !df.isStreaming) df.cache()
+      p.conf.name -> df
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d93e5de4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ea42828..1210132 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -42,8 +42,7 @@ object TaskConf {
     taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
   }
 }
-
-case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, options: Map[String, String] = Map.empty)
+case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None)
 
 trait Task extends Serializable with Logger {
   val conf: TaskConf