You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/23 04:36:44 UTC
[6/9] incubator-s2graph git commit: pass local cache configuration on
S2GraphSource.
pass local cache configuration on S2GraphSource.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b4dab3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b4dab3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b4dab3a3
Branch: refs/heads/master
Commit: b4dab3a328c457d5910ce2ade738aea2f002f30e
Parents: 5a862aa
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue Apr 17 16:30:24 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Tue Apr 17 16:36:17 2018 +0900
----------------------------------------------------------------------
.../scala/org/apache/s2graph/s2jobs/JobDescription.scala | 1 +
.../s2jobs/loader/LocalBulkLoaderTransformer.scala | 3 ++-
.../s2jobs/loader/SparkBulkLoaderTransformer.scala | 9 +++++----
.../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala | 10 +++++++---
.../scala/org/apache/s2graph/s2jobs/task/Source.scala | 7 ++++---
.../main/scala/org/apache/s2graph/s2jobs/task/Task.scala | 7 +++++--
.../scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala | 3 ++-
7 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 3be318d..6abbe86 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -50,6 +50,7 @@ object JobDescription extends Logger {
case "kafka" => new KafkaSource(conf)
case "file" => new FileSource(conf)
case "hive" => new HiveSource(conf)
+ case "s2graph" => new S2GraphSource(conf)
case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
index ad3483c..7b37509 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -23,13 +23,14 @@ import com.typesafe.config.Config
import org.apache.s2graph.core.{GraphElement, S2Graph}
import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
class LocalBulkLoaderTransformer(val config: Config,
val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] {
- val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+ val s2: S2Graph = S2SinkContext(config).getGraph
override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = {
val degrees = elements.flatMap { element =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index 5f8d3e5..95847f9 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -23,6 +23,7 @@ import com.typesafe.config.Config
import org.apache.s2graph.core.GraphElement
import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
@@ -40,7 +41,7 @@ class SparkBulkLoaderTransformer(val config: Config,
override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
val degrees = elements.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
+ val s2 = S2SinkContext(config).getGraph
iter.flatMap { element =>
DegreeKey.fromGraphElement(s2, element).map(_ -> 1L)
@@ -48,7 +49,7 @@ class SparkBulkLoaderTransformer(val config: Config,
}.reduceByKey(_ + _)
degrees.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
+ val s2 = S2SinkContext(config).getGraph
iter.map { case (degreeKey, count) =>
writer.writeDegree(s2)(degreeKey, count)
@@ -58,7 +59,7 @@ class SparkBulkLoaderTransformer(val config: Config,
override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
val elements = input.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
+ val s2 = S2SinkContext(config).getGraph
iter.flatMap { line =>
reader.read(s2)(line)
@@ -66,7 +67,7 @@ class SparkBulkLoaderTransformer(val config: Config,
}
val kvs = elements.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
+ val s2 = S2SinkContext(config).getGraph
iter.map(writer.write(s2)(_))
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 07a626d..0760dc6 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -216,7 +216,9 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = {
val options = TaskConf.toGraphFileOptions(conf)
- val config = Management.toConfig(options.toConfigParams)
+ val config = Management.toConfig(TaskConf.parseLocalCacheConfigs(conf) ++
+ TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ options.toConfigParams)
+
val input = df.rdd
val transformer = new SparkBulkLoaderTransformer(config, options)
@@ -236,7 +238,9 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
import scala.collection.JavaConversions._
import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
- val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+ // TODO: FIX THIS. overwrite local cache config.
+ val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
+ val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
val reader = new RowBulkFormatReader
@@ -246,7 +250,7 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
df.foreachPartition { iters =>
val config = ConfigFactory.parseString(serializedConfig)
- val s2Graph = S2GraphHelper.initS2Graph(config)
+ val s2Graph = S2SinkContext(config).getGraph
val responses = iters.grouped(groupedSize).flatMap { rows =>
val elements = rows.flatMap(row => reader.read(s2Graph)(row))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index dc5c054..f21acd1 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -113,7 +113,8 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names")
override def toDF(ss: SparkSession): DataFrame = {
- val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf)
+ val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++
+ TaskConf.parseLocalCacheConfigs(conf)
val config = Management.toConfig(mergedConf)
val snapshotPath = conf.options("hbase.rootdir")
@@ -126,17 +127,17 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
if (columnFamily == "v") false
else conf.options.getOrElse("build.degree", "false").toBoolean
val elementType = conf.options.getOrElse("element.type", "IndexEdge")
+ val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath,
restorePath, tableNames, columnFamily, elementType, batchSize, labelMapping, buildDegree)
-
implicit val reader = new S2GraphCellReader(elementType)
implicit val writer = new RowDataFrameWriter
+
val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree)
val kvs = transformer.transform(cells)
- val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
ss.sqlContext.createDataFrame(kvs, schema)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index 6ba2468..89c8dcd 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -30,14 +30,17 @@ object TaskConf {
GraphFileOptions.toOption(args)
}
- def parseHBaseConfigs(taskConf: TaskConf): Map[String, String] = {
+ def parseHBaseConfigs(taskConf: TaskConf): Map[String, Any] = {
taskConf.options.filterKeys(_.startsWith("hbase."))
}
- def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, String] = {
+ def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, Any] = {
taskConf.options.filterKeys(_.startsWith("db."))
}
+ def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = {
+ taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
+ }
}
case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 0461d1e..765bfb0 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -27,6 +27,7 @@ import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
import org.apache.s2graph.core.{Management, S2Graph}
import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
@@ -65,7 +66,7 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D
// initialize spark context.
super.beforeAll()
- s2 = S2GraphHelper.initS2Graph(s2Config)
+ s2 = S2SinkContext(s2Config).getGraph
initTestDataFile
}