You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/23 04:36:44 UTC

[6/9] incubator-s2graph git commit: pass local cache configuration on S2GraphSource.

pass local cache configuration on S2GraphSource.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b4dab3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b4dab3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b4dab3a3

Branch: refs/heads/master
Commit: b4dab3a328c457d5910ce2ade738aea2f002f30e
Parents: 5a862aa
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue Apr 17 16:30:24 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Tue Apr 17 16:36:17 2018 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/s2jobs/JobDescription.scala  |  1 +
 .../s2jobs/loader/LocalBulkLoaderTransformer.scala        |  3 ++-
 .../s2jobs/loader/SparkBulkLoaderTransformer.scala        |  9 +++++----
 .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala  | 10 +++++++---
 .../scala/org/apache/s2graph/s2jobs/task/Source.scala     |  7 ++++---
 .../main/scala/org/apache/s2graph/s2jobs/task/Task.scala  |  7 +++++--
 .../scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala   |  3 ++-
 7 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 3be318d..6abbe86 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -50,6 +50,7 @@ object JobDescription extends Logger {
       case "kafka" => new KafkaSource(conf)
       case "file"  => new FileSource(conf)
       case "hive" => new HiveSource(conf)
+      case "s2graph" => new S2GraphSource(conf)
       case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
index ad3483c..7b37509 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -23,13 +23,14 @@ import com.typesafe.config.Config
 import org.apache.s2graph.core.{GraphElement, S2Graph}
 import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
 import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 
 import scala.concurrent.ExecutionContext
 import scala.reflect.ClassTag
 
 class LocalBulkLoaderTransformer(val config: Config,
                                  val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] {
-  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+  val s2: S2Graph = S2SinkContext(config).getGraph
 
   override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = {
     val degrees = elements.flatMap { element =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index 5f8d3e5..95847f9 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -23,6 +23,7 @@ import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphElement
 import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
 import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.rdd.RDD
 
 import scala.reflect.ClassTag
@@ -40,7 +41,7 @@ class SparkBulkLoaderTransformer(val config: Config,
 
   override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
     val degrees = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2SinkContext(config).getGraph
 
       iter.flatMap { element =>
         DegreeKey.fromGraphElement(s2, element).map(_ -> 1L)
@@ -48,7 +49,7 @@ class SparkBulkLoaderTransformer(val config: Config,
     }.reduceByKey(_ + _)
 
     degrees.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2SinkContext(config).getGraph
 
       iter.map { case (degreeKey, count) =>
         writer.writeDegree(s2)(degreeKey, count)
@@ -58,7 +59,7 @@ class SparkBulkLoaderTransformer(val config: Config,
 
   override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
     val elements = input.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2SinkContext(config).getGraph
 
       iter.flatMap { line =>
         reader.read(s2)(line)
@@ -66,7 +67,7 @@ class SparkBulkLoaderTransformer(val config: Config,
     }
 
     val kvs = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2SinkContext(config).getGraph
 
       iter.map(writer.write(s2)(_))
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 07a626d..0760dc6 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -216,7 +216,9 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
 
   private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = {
     val options = TaskConf.toGraphFileOptions(conf)
-    val config = Management.toConfig(options.toConfigParams)
+    val config = Management.toConfig(TaskConf.parseLocalCacheConfigs(conf) ++
+      TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ options.toConfigParams)
+
     val input = df.rdd
 
     val transformer = new SparkBulkLoaderTransformer(config, options)
@@ -236,7 +238,9 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
     import scala.collection.JavaConversions._
     import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
 
-    val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+    // TODO: FIX THIS. overwrite local cache config.
+    val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
+    val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
     val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
 
     val reader = new RowBulkFormatReader
@@ -246,7 +250,7 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
 
     df.foreachPartition { iters =>
       val config = ConfigFactory.parseString(serializedConfig)
-      val s2Graph = S2GraphHelper.initS2Graph(config)
+      val s2Graph = S2SinkContext(config).getGraph
 
       val responses = iters.grouped(groupedSize).flatMap { rows =>
         val elements = rows.flatMap(row => reader.read(s2Graph)(row))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index dc5c054..f21acd1 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -113,7 +113,8 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
   override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names")
 
   override def toDF(ss: SparkSession): DataFrame = {
-    val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf)
+    val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++
+      TaskConf.parseLocalCacheConfigs(conf)
     val config = Management.toConfig(mergedConf)
 
     val snapshotPath = conf.options("hbase.rootdir")
@@ -126,17 +127,17 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
       if (columnFamily == "v") false
       else conf.options.getOrElse("build.degree", "false").toBoolean
     val elementType = conf.options.getOrElse("element.type", "IndexEdge")
+    val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
 
     val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath,
       restorePath, tableNames, columnFamily, elementType, batchSize, labelMapping, buildDegree)
 
-
     implicit val reader = new S2GraphCellReader(elementType)
     implicit val writer = new RowDataFrameWriter
+
     val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree)
     val kvs = transformer.transform(cells)
 
-    val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
     ss.sqlContext.createDataFrame(kvs, schema)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index 6ba2468..89c8dcd 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -30,14 +30,17 @@ object TaskConf {
     GraphFileOptions.toOption(args)
   }
 
-  def parseHBaseConfigs(taskConf: TaskConf): Map[String, String] = {
+  def parseHBaseConfigs(taskConf: TaskConf): Map[String, Any] = {
     taskConf.options.filterKeys(_.startsWith("hbase."))
   }
 
-  def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, String] = {
+  def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, Any] = {
     taskConf.options.filterKeys(_.startsWith("db."))
   }
 
+  def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = {
+    taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
+  }
 }
 case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 0461d1e..765bfb0 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -27,6 +27,7 @@ import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
 import org.apache.s2graph.core.{Management, S2Graph}
 import org.apache.s2graph.core.types.HBaseType
 import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.{SparkConf, SparkContext}
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
@@ -65,7 +66,7 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D
     // initialize spark context.
     super.beforeAll()
 
-    s2 = S2GraphHelper.initS2Graph(s2Config)
+    s2 = S2SinkContext(s2Config).getGraph
     initTestDataFile
   }