You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/04 02:45:04 UTC
[08/11] incubator-s2graph git commit: merge writeBatch for S2GraphSink
merge writeBatch for S2GraphSink
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/fc9fde7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/fc9fde7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/fc9fde7f
Branch: refs/heads/master
Commit: fc9fde7fbde1bcc18f2dec41a54ec70af01cd80c
Parents: c47359c c129ed0
Author: Chul Kang <el...@apache.org>
Authored: Tue Apr 3 18:47:13 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Tue Apr 3 18:47:13 2018 +0900
----------------------------------------------------------------------
.../apache/s2graph/s2jobs/S2GraphHelper.scala | 9 -----
.../s2jobs/loader/GraphFileOptions.scala | 6 ++++
.../s2graph/s2jobs/serde/Transformer.scala | 2 +-
.../org/apache/s2graph/s2jobs/task/Sink.scala | 35 ++++++++++----------
.../org/apache/s2graph/s2jobs/task/Task.scala | 9 +++++
.../apache/s2graph/s2jobs/BaseSparkTest.scala | 2 --
.../apache/s2graph/s2jobs/task/SinkTest.scala | 31 ++++++++++++-----
7 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 20c1558,0dce19a..77e8cb1
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@@ -214,21 -209,8 +214,8 @@@ class S2graphSink(queryName: String, co
override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
- override def write(inputDF: DataFrame): Unit = {
- val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
-
- if (inputDF.isStreaming) writeStream(df.writeStream)
- else {
- conf.options.getOrElse("writeMethod", "mutate") match {
- case "bulk" => writeBatchWithBulkload(df)
- case "mutate" => writeBatchWithMutate(df)
- }
-
- }
- }
-
- private def writeBatchWithBulkload(df:DataFrame):Unit = {
- val options = S2GraphHelper.toGraphFileOptions(conf)
- private def bulkload(df: DataFrame): Unit = {
++ private def writeBatchBulkload(df: DataFrame): Unit = {
+ val options = TaskConf.toGraphFileOptions(conf)
val config = Management.toConfig(options.toConfigParams)
val input = df.rdd
@@@ -245,32 -227,13 +232,46 @@@
HFileGenerator.loadIncrementHFile(options)
}
+ private def writeBatchWithMutate(df:DataFrame):Unit = {
+ import scala.collection.JavaConversions._
+ import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
- val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
++ val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+ val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
+
+ val reader = new RowBulkFormatReader
+
+ val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
+ val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
+
- df.foreachPartition{ iters =>
++ df.foreachPartition { iters =>
+ val config = ConfigFactory.parseString(serializedConfig)
+ val s2Graph = S2GraphHelper.initS2Graph(config)
+
+ val responses = iters.grouped(groupedSize).flatMap { rows =>
+ val elements = rows.flatMap(row => reader.read(s2Graph)(row))
+
+ val mutateF = s2Graph.mutateElements(elements, true)
+ Await.result(mutateF, Duration(waitTime, "seconds"))
+ }
+
+ val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
+ logger.info(s"success : ${success.size}, fail : ${fail.size}")
+ }
+ }
++
+ override def write(inputDF: DataFrame): Unit = {
+ val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+
+ if (inputDF.isStreaming) writeStream(df.writeStream)
+ else {
- bulkload(df)
++ conf.options.getOrElse("writeMethod", "mutate") match {
++ case "mutate" => writeBatchWithMutate(df)
++ case "bulk" => writeBatchBulkload(df)
++ case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)")
++
++ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 4f02808,4f02808..0461d1e
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@@ -33,8 -33,8 +33,6 @@@ import org.scalatest.{BeforeAndAfterAll
import scala.util.Try
class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
-- private val master = "local[2]"
-- private val appName = "example-spark"
protected val options = GraphFileOptions(
input = "/tmp/test.txt",
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index db96328,a21b3df..02b724e
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@@ -26,6 -26,6 +26,10 @@@ import org.apache.spark.sql.DataFram
import scala.collection.JavaConverters._
class SinkTest extends BaseSparkTest {
++ override def beforeAll(): Unit = {
++ super.beforeAll()
++ initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
++ }
def toDataFrame(edges: Seq[String]): DataFrame = {
import spark.sqlContext.implicits._
val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
@@@ -42,13 -42,12 +46,11 @@@
}.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
}
- test("S2graphSink writeBatch.") {
- val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
-
+ test("S2graphSink writeBatchWithBulkload") {
- initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
-
val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
val df = toDataFrame(Seq(bulkEdgeString))
- val args = options.toCommand.grouped(2).map { kv =>
+ val args = Map("writeMethod" -> "bulk") ++
+ options.toCommand.grouped(2).map { kv =>
kv.head -> kv.last
}.toMap
@@@ -59,25 -58,5 +61,38 @@@
val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
s2Edges.foreach { edge => println(edge) }
++
++ val filteredEdges = s2Edges.filter{ edge =>
++ edge.srcVertex.innerIdVal.toString == "a" &&
++ edge.tgtVertex.innerIdVal.toString == "b" &&
++ edge.label() == "friends"
++ }
++
++ assert(filteredEdges.size == 1)
+ }
+
+ test("S2graphSink writeBatchWithMutate") {
- initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
-
- val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":20}"
++ val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}"
+ val df = toDataFrame(Seq(bulkEdgeString))
+ val args = Map("writeMethod" -> "mutate") ++
- options.toCommand.grouped(2).map { kv =>
- kv.head -> kv.last
- }.toMap
++ options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap
+
+ val conf = TaskConf("test", "sql", Seq("input"), args)
+
+ val sink = new S2graphSink("testQuery", conf)
+ sink.write(df)
+
+ val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+ s2Edges.foreach { edge => println(edge) }
++
++ val filteredEdges = s2Edges.filter{ edge =>
++ edge.srcVertex.innerIdVal.toString == "b" &&
++ edge.tgtVertex.innerIdVal.toString == "c" &&
++ edge.getTs() == 1416236400000L &&
++ edge.label() == "friends"
++ }
++
++ assert(filteredEdges.size == 1)
}
+
}