You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/04 02:45:01 UTC

[05/11] incubator-s2graph git commit: add writeBatchWithMutate on S2GraphSink

add writeBatchWithMutate on S2GraphSink


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7ad721cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7ad721cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7ad721cf

Branch: refs/heads/master
Commit: 7ad721cfa5f87916cf445eabb08f14d8490a95c7
Parents: 86dcc11
Author: Chul Kang <el...@apache.org>
Authored: Tue Apr 3 14:52:50 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Tue Apr 3 14:52:50 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 63 ++++++++++++++++----
 1 file changed, 52 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ad721cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 7c4c857..20c1558 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.s2jobs.task
 
-import com.typesafe.config.Config
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 import org.apache.hadoop.util.ToolRunner
@@ -28,10 +28,15 @@ import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 /**
   * Sink
   *
@@ -214,21 +219,57 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
     else {
-      val options = S2GraphHelper.toGraphFileOptions(conf)
-      val config = Management.toConfig(options.toConfigParams)
-      val input = df.rdd
+      conf.options.getOrElse("writeMethod", "mutate") match {
+        case "bulk" => writeBatchWithBulkload(df)
+        case "mutate" => writeBatchWithMutate(df)
+      }
+
+    }
+  }
+
+  private def writeBatchWithBulkload(df:DataFrame):Unit = {
+    val options = S2GraphHelper.toGraphFileOptions(conf)
+    val config = Management.toConfig(options.toConfigParams)
+    val input = df.rdd
+
+    val transformer = new SparkBulkLoaderTransformer(config, options)
+
+    implicit val reader = new RowBulkFormatReader
+    implicit val writer = new KeyValueWriter
+
+    val kvs = transformer.transform(input)
+
+    HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+
+    // finish bulk load by execute LoadIncrementHFile.
+    HFileGenerator.loadIncrementHFile(options)
+  }
+
+  private def writeBatchWithMutate(df:DataFrame):Unit = {
+    import scala.collection.JavaConversions._
+    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+    val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+    val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
+
+    val reader = new RowBulkFormatReader
 
-      val transformer = new SparkBulkLoaderTransformer(config, options)
+    val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
+    val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
 
-      implicit val reader = new RowBulkFormatReader
-      implicit val writer = new KeyValueWriter
+    df.foreachPartition{ iters =>
+      val config = ConfigFactory.parseString(serializedConfig)
+      val s2Graph = S2GraphHelper.initS2Graph(config)
 
-      val kvs = transformer.transform(input)
+      val responses = iters.grouped(groupedSize).flatMap { rows =>
+        val elements = rows.flatMap(row => reader.read(s2Graph)(row))
 
-      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+        val mutateF = s2Graph.mutateElements(elements, true)
+        Await.result(mutateF, Duration(waitTime, "seconds"))
+      }
 
-      // finish bulk load by execute LoadIncrementHFile.
-      HFileGenerator.loadIncrementHFile(options)
+      val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
+      logger.info(s"success : ${success.size}, fail : ${fail.size}")
     }
   }
 }