You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/04 02:45:01 UTC
[05/11] incubator-s2graph git commit: add writeBatchWithMutate on
S2GraphSink
add writeBatchWithMutate on S2GraphSink
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7ad721cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7ad721cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7ad721cf
Branch: refs/heads/master
Commit: 7ad721cfa5f87916cf445eabb08f14d8490a95c7
Parents: 86dcc11
Author: Chul Kang <el...@apache.org>
Authored: Tue Apr 3 14:52:50 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Tue Apr 3 14:52:50 2018 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/s2jobs/task/Sink.scala | 63 ++++++++++++++++----
1 file changed, 52 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ad721cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 7c4c857..20c1558 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.s2jobs.task
-import com.typesafe.config.Config
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.util.ToolRunner
@@ -28,10 +28,15 @@ import org.apache.s2graph.s2jobs.S2GraphHelper
import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
import org.elasticsearch.spark.sql.EsSparkSQL
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
/**
* Sink
*
@@ -214,21 +219,57 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
if (inputDF.isStreaming) writeStream(df.writeStream)
else {
- val options = S2GraphHelper.toGraphFileOptions(conf)
- val config = Management.toConfig(options.toConfigParams)
- val input = df.rdd
+ conf.options.getOrElse("writeMethod", "mutate") match {
+ case "bulk" => writeBatchWithBulkload(df)
+ case "mutate" => writeBatchWithMutate(df)
+ }
+
+ }
+ }
+
+ private def writeBatchWithBulkload(df:DataFrame):Unit = {
+ val options = S2GraphHelper.toGraphFileOptions(conf)
+ val config = Management.toConfig(options.toConfigParams)
+ val input = df.rdd
+
+ val transformer = new SparkBulkLoaderTransformer(config, options)
+
+ implicit val reader = new RowBulkFormatReader
+ implicit val writer = new KeyValueWriter
+
+ val kvs = transformer.transform(input)
+
+ HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+
+ // finish bulk load by execute LoadIncrementHFile.
+ HFileGenerator.loadIncrementHFile(options)
+ }
+
+ private def writeBatchWithMutate(df:DataFrame):Unit = {
+ import scala.collection.JavaConversions._
+ import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+ val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+ val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
+
+ val reader = new RowBulkFormatReader
- val transformer = new SparkBulkLoaderTransformer(config, options)
+ val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
+ val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
- implicit val reader = new RowBulkFormatReader
- implicit val writer = new KeyValueWriter
+ df.foreachPartition{ iters =>
+ val config = ConfigFactory.parseString(serializedConfig)
+ val s2Graph = S2GraphHelper.initS2Graph(config)
- val kvs = transformer.transform(input)
+ val responses = iters.grouped(groupedSize).flatMap { rows =>
+ val elements = rows.flatMap(row => reader.read(s2Graph)(row))
- HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+ val mutateF = s2Graph.mutateElements(elements, true)
+ Await.result(mutateF, Duration(waitTime, "seconds"))
+ }
- // finish bulk load by execute LoadIncrementHFile.
- HFileGenerator.loadIncrementHFile(options)
+ val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
+ logger.info(s"success : ${success.size}, fail : ${fail.size}")
}
}
}