You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/04/04 02:45:04 UTC

[08/11] incubator-s2graph git commit: merge writeBatch for S2GraphSink

merge writeBatch for S2GraphSink


Branch: refs/heads/master
Commit: fc9fde7fbde1bcc18f2dec41a54ec70af01cd80c
Parents: c47359c c129ed0
Author: Chul Kang <>
Authored: Tue Apr 3 18:47:13 2018 +0900
Committer: Chul Kang <>
Committed: Tue Apr 3 18:47:13 2018 +0900

 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  9 -----
 .../s2jobs/loader/GraphFileOptions.scala        |  6 ++++
 .../s2graph/s2jobs/serde/Transformer.scala      |  2 +-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 35 ++++++++++----------
 .../org/apache/s2graph/s2jobs/task/Task.scala   |  9 +++++
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |  2 --
 .../apache/s2graph/s2jobs/task/SinkTest.scala   | 31 ++++++++++++-----
 7 files changed, 57 insertions(+), 37 deletions(-)
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 20c1558,0dce19a..77e8cb1
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@@ -214,21 -209,8 +214,8 @@@ class S2graphSink(queryName: String, co
    override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
-   override def write(inputDF: DataFrame): Unit = {
-     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
-     if (inputDF.isStreaming) writeStream(df.writeStream)
-     else {
-       conf.options.getOrElse("writeMethod", "mutate") match {
-         case "bulk" => writeBatchWithBulkload(df)
-         case "mutate" => writeBatchWithMutate(df)
-       }
-     }
-   }
-   private def writeBatchWithBulkload(df:DataFrame):Unit = {
-     val options = S2GraphHelper.toGraphFileOptions(conf)
 -  private def bulkload(df: DataFrame): Unit = {
++  private def writeBatchBulkload(df: DataFrame): Unit = {
+     val options = TaskConf.toGraphFileOptions(conf)
      val config = Management.toConfig(options.toConfigParams)
      val input = df.rdd
@@@ -245,32 -227,13 +232,46 @@@
 +  private def writeBatchWithMutate(df:DataFrame):Unit = {
 +    import scala.collection.JavaConversions._
 +    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
-     val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
++    val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
 +    val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
 +    val reader = new RowBulkFormatReader
 +    val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
 +    val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
-     df.foreachPartition{ iters =>
++    df.foreachPartition { iters =>
 +      val config = ConfigFactory.parseString(serializedConfig)
 +      val s2Graph = S2GraphHelper.initS2Graph(config)
 +      val responses = iters.grouped(groupedSize).flatMap { rows =>
 +        val elements = rows.flatMap(row =>
 +        val mutateF = s2Graph.mutateElements(elements, true)
 +        Await.result(mutateF, Duration(waitTime, "seconds"))
 +      }
 +      val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
 +"success : ${success.size}, fail : ${fail.size}")
 +    }
 +  }
+   override def write(inputDF: DataFrame): Unit = {
+     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+     if (inputDF.isStreaming) writeStream(df.writeStream)
+     else {
 -      bulkload(df)
++      conf.options.getOrElse("writeMethod", "mutate") match {
++        case "mutate" => writeBatchWithMutate(df)
++        case "bulk" => writeBatchBulkload(df)
++        case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)")
++      }
+     }
+   }
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 4f02808,4f02808..0461d1e
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@@ -33,8 -33,8 +33,6 @@@ import org.scalatest.{BeforeAndAfterAll
  import scala.util.Try
  class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
--  private val master = "local[2]"
--  private val appName = "example-spark"
    protected val options = GraphFileOptions(
      input = "/tmp/test.txt",
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index db96328,a21b3df..02b724e
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@@ -26,6 -26,6 +26,10 @@@ import org.apache.spark.sql.DataFram
  import scala.collection.JavaConverters._
  class SinkTest extends BaseSparkTest {
++  override def beforeAll(): Unit = {
++    super.beforeAll()
++    initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
++  }
    def toDataFrame(edges: Seq[String]): DataFrame = {
      import spark.sqlContext.implicits._
      val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
@@@ -42,13 -42,12 +46,11 @@@
      }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
 -  test("S2graphSink writeBatch.") {
 -    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
 +  test("S2graphSink writeBatchWithBulkload") {
-     initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
      val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
      val df = toDataFrame(Seq(bulkEdgeString))
 -    val args = options.toCommand.grouped(2).map { kv =>
 +    val args = Map("writeMethod" -> "bulk") ++
 +      options.toCommand.grouped(2).map { kv =>
        kv.head -> kv.last
@@@ -59,25 -58,5 +61,38 @@@
      val s2Edges = s2.edges()[S2EdgeLike])
      s2Edges.foreach { edge => println(edge) }
++    val filteredEdges = s2Edges.filter{ edge =>
++      edge.srcVertex.innerIdVal.toString == "a" &&
++        edge.tgtVertex.innerIdVal.toString == "b" &&
++        edge.label() == "friends"
++    }
++    assert(filteredEdges.size == 1)
 +  }
 +  test("S2graphSink writeBatchWithMutate") {
-     initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
-     val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":20}"
++    val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}"
 +    val df = toDataFrame(Seq(bulkEdgeString))
 +    val args = Map("writeMethod" -> "mutate") ++
-       options.toCommand.grouped(2).map { kv =>
-         kv.head -> kv.last
-       }.toMap
++      options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap
 +    val conf = TaskConf("test", "sql", Seq("input"), args)
 +    val sink = new S2graphSink("testQuery", conf)
 +    sink.write(df)
 +    val s2Edges = s2.edges()[S2EdgeLike])
 +    s2Edges.foreach { edge => println(edge) }
++    val filteredEdges = s2Edges.filter{ edge =>
++      edge.srcVertex.innerIdVal.toString == "b" &&
++      edge.tgtVertex.innerIdVal.toString == "c" &&
++      edge.getTs() == 1416236400000L &&
++      edge.label() == "friends"
++    }
++    assert(filteredEdges.size == 1)