You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/04 02:44:57 UTC

[01/11] incubator-s2graph git commit: implement S2GraphSink.write method for writeBatch.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 3332f6bc1 -> 1799ae456


implement S2GraphSink.write method for writeBatch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b5535ebc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b5535ebc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b5535ebc

Branch: refs/heads/master
Commit: b5535ebccf72cd71ba45c177dc1d4ea2adaf94b8
Parents: 3332f6b
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 2 17:10:45 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 2 17:10:45 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  57 ++++-
 .../SparkGraphElementLoaderTransformer.scala    |  75 +++++++
 .../serde/reader/RowBulkFormatReader.scala      |  14 ++
 .../org/apache/s2graph/s2jobs/task/Sink.scala   |  25 ++-
 .../sql/streaming/S2StreamQueryWriter.scala     |  88 ++++----
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |  13 +-
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 216 ++++++++++---------
 7 files changed, 338 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 3f80e8f..9eb9cc8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,9 +24,15 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
-import play.api.libs.json.Json
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.types.StructType
+import play.api.libs.json.{JsObject, Json}
 
 import scala.concurrent.ExecutionContext
+import scala.util.Try
 
 object S2GraphHelper {
   def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
@@ -78,4 +84,53 @@ object S2GraphHelper {
       Nil
     }
   }
+
+  //TODO:
+  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+    GraphFileOptions()
+  }
+
+  def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = {
+    val timestamp = row.getAs[Long]("timestamp")
+    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
+    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
+
+    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
+      case Some(propsStr:String) =>
+        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
+      case None =>
+        schema.fieldNames.flatMap { field =>
+          if (!reservedColumn.contains(field)) {
+            Seq(
+              field -> getRowValAny(row, field)
+            )
+          } else Nil
+        }.toMap
+    }
+
+    elem match {
+      case "e" | "edge" =>
+        val from = getRowValAny(row, "from")
+        val to = getRowValAny(row, "to")
+        val label = row.getAs[String]("label")
+        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
+        Some(
+          s2.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
+        )
+      case "v" | "vertex" =>
+        val id = getRowValAny(row, "id")
+        val serviceName = row.getAs[String]("service")
+        val columnName = row.getAs[String]("column")
+        Some(
+          s2.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
+        )
+      case _ =>
+        None
+    }
+  }
+
+  private def getRowValAny(row:Row, fieldName:String):Any = {
+    val idx = row.fieldIndex(fieldName)
+    row.get(idx)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
new file mode 100644
index 0000000..fcf8d4c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.serde.Transformer
+import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader}
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+
+class SparkGraphElementLoaderTransformer(val config: Config,
+                                         val options: GraphFileOptions) extends Transformer[Row, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
+  val reader = new RowBulkFormatReader
+
+  val writer = new KeyValueWriter
+
+  override def read(input: RDD[Row]): RDD[GraphElement] = input.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.flatMap(reader.read(s2)(_))
+  }
+
+  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.map(writer.write(s2)(_))
+  }
+
+  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
+    val degrees = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { element =>
+        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+      }
+    }.reduceByKey(_ + _)
+
+    degrees.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map { case (degreeKey, count) =>
+        DegreeKey.toKeyValue(s2, degreeKey, count)
+      }
+    }
+  }
+
+  override def transform(input: RDD[Row]): RDD[Seq[HKeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    if (options.buildDegree) kvs ++ buildDegrees(elements)
+    kvs
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
new file mode 100644
index 0000000..73e56ce
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
@@ -0,0 +1,14 @@
+package org.apache.s2graph.s2jobs.serde.reader
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+import org.apache.spark.sql.Row
+
+class RowBulkFormatReader extends GraphElementReadable[Row] {
+  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction")
+
+  override def read(s2: S2Graph)(row: Row): Option[GraphElement] =
+    S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, RESERVED_COLUMN)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 3d5beb6..b7a91d9 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -19,6 +19,12 @@
 
 package org.apache.s2graph.s2jobs.task
 
+import com.typesafe.config.Config
+import org.apache.s2graph.core.{GraphElement, Management}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer, SparkGraphElementLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
@@ -189,7 +195,22 @@ class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf)
   override def mandatoryOptions: Set[String] = Set()
   override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  override protected def writeBatch(writer: DataFrameWriter[Row]): Unit =
-    throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}")
+  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction")
+
+  override def write(inputDF: DataFrame):Unit = {
+    val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+
+    if (inputDF.isStreaming) writeStream(df.writeStream)
+    else {
+      val config: Config = Management.toConfig(conf.options)
+      val bulkLoadOptions: GraphFileOptions = S2GraphHelper.toGraphFileOptions(conf)
+      val input = df.rdd
+
+      val transformer = new SparkGraphElementLoaderTransformer(config, bulkLoadOptions)
+      val kvs = transformer.transform(input)
+
+      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), bulkLoadOptions)
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
index e15efe8..ac37533 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.spark.sql.streaming
 
 import com.typesafe.config.ConfigFactory
 import org.apache.s2graph.core.{GraphElement, JSONParser}
+import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.Row
@@ -83,48 +84,51 @@ private [sql] class S2StreamQueryWriter(
     }
   }
 
-  private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = {
-    val s2Graph = s2SinkContext.getGraph
-    val row = encoder.fromRow(internalRow)
-
-    val timestamp = row.getAs[Long]("timestamp")
-    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
-    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
-
-    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
-      case Some(propsStr:String) =>
-        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
-      case None =>
-        schema.fieldNames.flatMap { field =>
-          if (!RESERVED_COLUMN.contains(field)) {
-            Seq(
-              field -> getRowValAny(row, field)
-            )
-          } else Nil
-        }.toMap
-    }
-
-    elem match {
-      case "e" | "edge" =>
-        val from = getRowValAny(row, "from")
-        val to = getRowValAny(row, "to")
-        val label = row.getAs[String]("label")
-        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
-        Some(
-          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
-        )
-      case "v" | "vertex" =>
-        val id = getRowValAny(row, "id")
-        val serviceName = row.getAs[String]("service")
-        val columnName = row.getAs[String]("column")
-        Some(
-          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
-        )
-      case _ =>
-        logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})")
-        None
-    }
-  }
+  private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
+    S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
+
+//  {
+//    val s2Graph = s2SinkContext.getGraph
+//    val row = encoder.fromRow(internalRow)
+//
+//    val timestamp = row.getAs[Long]("timestamp")
+//    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
+//    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
+//
+//    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
+//      case Some(propsStr:String) =>
+//        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
+//      case None =>
+//        schema.fieldNames.flatMap { field =>
+//          if (!RESERVED_COLUMN.contains(field)) {
+//            Seq(
+//              field -> getRowValAny(row, field)
+//            )
+//          } else Nil
+//        }.toMap
+//    }
+//
+//    elem match {
+//      case "e" | "edge" =>
+//        val from = getRowValAny(row, "from")
+//        val to = getRowValAny(row, "to")
+//        val label = row.getAs[String]("label")
+//        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
+//        Some(
+//          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
+//        )
+//      case "v" | "vertex" =>
+//        val id = getRowValAny(row, "id")
+//        val serviceName = row.getAs[String]("service")
+//        val columnName = row.getAs[String]("column")
+//        Some(
+//          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
+//        )
+//      case _ =>
+//        logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})")
+//        None
+//    }
+//  }
 
   private def getRowValAny(row:Row, fieldName:String):Any = {
     val idx = row.fieldIndex(fieldName)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 78000d4..4f02808 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs
 
 import java.io.{File, PrintWriter}
 
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
 import org.apache.s2graph.core.{Management, S2Graph}
@@ -31,11 +32,10 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import scala.util.Try
 
-class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
+class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
   private val master = "local[2]"
   private val appName = "example-spark"
 
-  protected var sc: SparkContext = _
   protected val options = GraphFileOptions(
     input = "/tmp/test.txt",
     tempDir = "/tmp/bulkload_tmp",
@@ -65,18 +65,15 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
 
   override def beforeAll(): Unit = {
     // initialize spark context.
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    sc = new SparkContext(conf)
+    super.beforeAll()
 
     s2 = S2GraphHelper.initS2Graph(s2Config)
     initTestDataFile
   }
 
   override def afterAll(): Unit = {
-    if (sc != null) sc.stop()
+    super.afterAll()
+
     if (s2 != null) s2.shutdown()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 3fbbd88..baf9b32 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,24 +19,22 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{PostProcess, S2VertexLike}
+import org.apache.s2graph.core.PostProcess
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.spark.rdd.RDD
 import play.api.libs.json.Json
 
-import scala.io.Source
-
 class GraphFileGeneratorTest extends BaseSparkTest {
-  import scala.concurrent.ExecutionContext.Implicits.global
+
   import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
 
+  import scala.concurrent.ExecutionContext.Implicits.global
+
   def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = {
     transformerMode match {
       case "spark" =>
-        val input = sc.parallelize(edges)
+        val input: RDD[String] = sc.parallelize(edges)
         val transformer = new SparkBulkLoaderTransformer(s2Config, options)
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
@@ -54,39 +52,39 @@ class GraphFileGeneratorTest extends BaseSparkTest {
             CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
           }
         }.toList
-    }
-  }
-  test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
-    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
-    /* end of initialize model */
-
-    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
-
-    val transformerMode = "spark"
-    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
-
-    val serDe = s2.defaultStorage.serDe
-
-    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get
 
-    val indexEdges = ls.flatMap { kv =>
-      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None)
+      case "dataset" =>
+        import spark.sqlContext.implicits._
+        val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+        val rows = elements.map { e =>
+          (e.getTs(),
+            e.getOperation(),
+            "e",
+            e.srcVertex.innerIdVal.toString,
+            e.tgtVertex.innerIdVal.toString,
+            e.label(),
+            "{}",
+            e.getDirection())
+        }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd
+
+        val transformer = new SparkGraphElementLoaderTransformer(s2Config, options)
+        val kvs = transformer.transform(rows)
+        kvs.flatMap { kvs =>
+          kvs.map { kv =>
+            CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
+          }
+        }.collect().toList
     }
-
-    val indexEdge = indexEdges.head
-
-    println(indexEdge)
-    println(bulkEdge)
-
-    bulkEdge shouldBe (indexEdge)
   }
-  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
+
+  test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
     /* end of initialize model */
 
     val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
 
-    val transformerMode = "local"
+    val transformerMode = "dataset"
     val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
 
     val serDe = s2.defaultStorage.serDe
@@ -104,82 +102,106 @@ class GraphFileGeneratorTest extends BaseSparkTest {
 
     bulkEdge shouldBe (indexEdge)
   }
-
-  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
-    val serviceColumn = initTestVertexSchema(s2)
-    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
-
-    val transformerMode = "spark"
-    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
-
-    val serDe = s2.defaultStorage.serDe
-
-    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
-
-    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-      println(Json.prettyPrint(jsValue))
-    }
-
-    bulkVertex shouldBe(vertex)
-  }
-
-  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
-    val serviceColumn = initTestVertexSchema(s2)
-    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
-
-    val transformerMode = "local"
-    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
-
-    val serDe = s2.defaultStorage.serDe
-
-    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
-
-    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-      println(Json.prettyPrint(jsValue))
-    }
-
-    bulkVertex shouldBe(vertex)
-  }
-
-//   this test case expect options.input already exist with valid bulk load format.
-//  test("bulk load and fetch vertex: spark mode") {
-//    import scala.collection.JavaConverters._
-//    val serviceColumn = initTestVertexSchema(s2)
+//  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
+//    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+//    /* end of initialize model */
+//
+//    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
 //
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
+//    val transformerMode = "local"
+//    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
 //
-//    HFileGenerator.generate(sc, s2Config, input, options)
+//    val serDe = s2.defaultStorage.serDe
 //
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
+//    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get
 //
-//    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+//    val indexEdges = ls.flatMap { kv =>
+//      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None)
+//    }
 //
-//    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
+//    val indexEdge = indexEdges.head
 //
-//    println(Json.prettyPrint(json))
+//    println(indexEdge)
+//    println(bulkEdge)
+//
+//    bulkEdge shouldBe (indexEdge)
 //  }
-
-//   this test case expect options.input already exist with valid bulk load format.
-//  test("bulk load and fetch vertex: mr mode") {
+//
+//  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
+//    val serviceColumn = initTestVertexSchema(s2)
+//    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
+//
+//    val transformerMode = "spark"
+//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+//
+//    val serDe = s2.defaultStorage.serDe
+//
+//    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
+//
+//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+//      println(Json.prettyPrint(jsValue))
+//    }
+//
+//    bulkVertex shouldBe (vertex)
+//  }
+//
+//  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
 //    val serviceColumn = initTestVertexSchema(s2)
+//    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
 //
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
+//    val transformerMode = "local"
+//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
 //
-//    HFileMRGenerator.generate(sc, s2Config, input, options)
+//    val serDe = s2.defaultStorage.serDe
 //
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
+//    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
 //
-//    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-//    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
+//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+//      println(Json.prettyPrint(jsValue))
+//    }
 //
-//    println(Json.prettyPrint(json))
+//    bulkVertex shouldBe (vertex)
 //  }
+
+  //   this test case expect options.input already exist with valid bulk load format.
+  //  test("bulk load and fetch vertex: spark mode") {
+  //    import scala.collection.JavaConverters._
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //
+  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
+
+  //   this test case expect options.input already exist with valid bulk load format.
+  //  test("bulk load and fetch vertex: mr mode") {
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileMRGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
 }


[03/11] incubator-s2graph git commit: implement writeBatch only for LoadIncrementHFile.

Posted by st...@apache.org.
implement writeBatch only for LoadIncrementHFile.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/86dcc112
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/86dcc112
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/86dcc112

Branch: refs/heads/master
Commit: 86dcc112d9582435c136ca60e050be3e993e685f
Parents: ae19dc1
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 2 18:58:51 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 2 19:02:53 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |   5 +-
 .../s2jobs/loader/GraphFileOptions.scala        |  24 +++
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  10 +-
 .../serde/reader/RowBulkFormatReader.scala      |  19 ++
 .../org/apache/s2graph/s2jobs/task/Sink.scala   |  16 +-
 .../sql/streaming/S2StreamQueryWriter.scala     |  48 -----
 .../s2jobs/dump/GraphFileDumperTest.scala       |  97 ----------
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 192 +++++++++----------
 .../apache/s2graph/s2jobs/task/SinkTest.scala   |  62 ++++++
 9 files changed, 221 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 9eb9cc8..69b3716 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -85,9 +85,10 @@ object S2GraphHelper {
     }
   }
 
-  //TODO:
   def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
-    GraphFileOptions()
+    val args = taskConf.options.flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+
+    GraphFileOptions.toOption(args)
   }
 
   def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index 3e3ffb9..4bf8379 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -91,6 +91,10 @@ object GraphFileOptions {
       (inner.head, inner.last)
     }).toMap
   }
+
+  def toLabelMappingString(labelMapping: Map[String, String]): String =
+    labelMapping.map { case (k, v) => Seq(k, v).mkString(":") }.mkString(",")
+
 }
 /**
   * Option case class for TransferToHFile.
@@ -135,4 +139,24 @@ case class GraphFileOptions(input: String = "",
       "db.default.driver" -> dbDriver
     )
   }
+
+  def toCommand: Array[String] =
+    Array(
+      "--input", input,
+      "--tempDir", tempDir,
+      "--output", output,
+      "--zkQuorum", zkQuorum,
+      "--table", tableName,
+      "--dbUrl", dbUrl,
+      "--dbUser", dbUser,
+      "--dbPassword", dbPassword,
+      "--dbDriver", dbDriver,
+      "--maxHFilePerRegionServer", maxHFilePerRegionServer.toString,
+      "--numRegions", numRegions.toString,
+      "--labelMapping", GraphFileOptions.toLabelMappingString(labelMapping),
+      "--autoEdgeCreate", autoEdgeCreate.toString,
+      "--buildDegree", buildDegree.toString,
+      "--incrementalLoad", incrementalLoad.toString,
+      "--method", method
+    )
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index 431631b..da190ee 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -24,10 +24,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.ConnectionFactory
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
+import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableOutputFormat}
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName}
+import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
@@ -123,5 +124,12 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
 
     HFileGenerator.generateHFile(sc, config, kvs, _options)
   }
+
+  def loadIncrementHFile(options: GraphFileOptions): Int = {
+    /* LoadIncrementHFiles */
+    val hfileArgs = Array(options.output, options.tableName)
+    val hbaseConfig = HBaseConfiguration.create()
+    ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
index 73e56ce..12b2ba2 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.s2jobs.serde.reader
 
 import org.apache.s2graph.core.{GraphElement, S2Graph}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index bc67822..7c4c857 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,6 +20,9 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.Config
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.Management
 import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
@@ -206,25 +209,26 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
 
   override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction")
-
   override def write(inputDF: DataFrame): Unit = {
     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
     else {
-      val config: Config = Management.toConfig(conf.options)
-      val bulkLoadOptions: GraphFileOptions = S2GraphHelper.toGraphFileOptions(conf)
+      val options = S2GraphHelper.toGraphFileOptions(conf)
+      val config = Management.toConfig(options.toConfigParams)
       val input = df.rdd
 
-      val transformer = new SparkBulkLoaderTransformer(config, bulkLoadOptions)
+      val transformer = new SparkBulkLoaderTransformer(config, options)
 
       implicit val reader = new RowBulkFormatReader
       implicit val writer = new KeyValueWriter
 
       val kvs = transformer.transform(input)
 
-      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), bulkLoadOptions)
+      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+
+      // finish bulk load by execute LoadIncrementHFile.
+      HFileGenerator.loadIncrementHFile(options)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
index ac37533..f6fecd7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -86,52 +86,4 @@ private [sql] class S2StreamQueryWriter(
 
   private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
     S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
-
-//  {
-//    val s2Graph = s2SinkContext.getGraph
-//    val row = encoder.fromRow(internalRow)
-//
-//    val timestamp = row.getAs[Long]("timestamp")
-//    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
-//    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
-//
-//    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
-//      case Some(propsStr:String) =>
-//        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
-//      case None =>
-//        schema.fieldNames.flatMap { field =>
-//          if (!RESERVED_COLUMN.contains(field)) {
-//            Seq(
-//              field -> getRowValAny(row, field)
-//            )
-//          } else Nil
-//        }.toMap
-//    }
-//
-//    elem match {
-//      case "e" | "edge" =>
-//        val from = getRowValAny(row, "from")
-//        val to = getRowValAny(row, "to")
-//        val label = row.getAs[String]("label")
-//        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
-//        Some(
-//          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
-//        )
-//      case "v" | "vertex" =>
-//        val id = getRowValAny(row, "id")
-//        val serviceName = row.getAs[String]("service")
-//        val columnName = row.getAs[String]("column")
-//        Some(
-//          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
-//        )
-//      case _ =>
-//        logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})")
-//        None
-//    }
-//  }
-
-  private def getRowValAny(row:Row, fieldName:String):Any = {
-    val idx = row.fieldIndex(fieldName)
-    row.get(idx)
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
deleted file mode 100644
index 81566f9..0000000
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//package org.apache.s2graph.s2jobs.dump
-//
-//import org.apache.s2graph.core._
-//import org.apache.s2graph.core.types.HBaseType
-//import org.apache.s2graph.s2jobs.S2GraphHelper
-//import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-//import play.api.libs.json.Json
-//
-//class GraphFileDumperTest extends FunSuite with Matchers with BeforeAndAfterAll {
-//  private val master = "local[2]"
-//  private val appName = "example-spark"
-//
-//  private var sc: SparkContext = _
-//  val options = GraphFileOptions(
-//    input = "/tmp/imei-20.txt",
-//    tempDir = "/tmp/bulkload_tmp",
-//    output = "/tmp/s2graph_bulkload",
-//    zkQuorum = "localhost",
-//    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
-//    dbUser = "sa",
-//    dbPassword = "sa",
-//    dbDriver = "org.h2.Driver",
-//    tableName = "s2graph",
-//    maxHFilePerRegionServer = 1,
-//    numRegions = 3,
-//    compressionAlgorithm = "NONE",
-//    buildDegree = false,
-//    autoEdgeCreate = false)
-//
-//  val s2Config = Management.toConfig(options.toConfigParams)
-//
-//  val tableName = options.tableName
-//  val schemaVersion = HBaseType.DEFAULT_VERSION
-//  val compressionAlgorithm: String = options.compressionAlgorithm
-//  var s2: S2Graph = _
-//
-//  override def beforeAll(): Unit = {
-//    // initialize spark context.
-//    val conf = new SparkConf()
-//      .setMaster(master)
-//      .setAppName(appName)
-//
-//    sc = new SparkContext(conf)
-//
-//    s2 = S2GraphHelper.initS2Graph(s2Config)
-//  }
-//
-//  override def afterAll(): Unit = {
-//    if (sc != null) sc.stop()
-//    if (s2 != null) s2.shutdown()
-//  }
-//
-//  test("test dump.") {
-//    implicit val graph = s2
-//    val snapshotPath = "/usr/local/var/hbase"
-//    val restorePath = "/tmp/hbase_restore"
-//    val snapshotTableNames = Seq("s2graph-snapshot")
-//
-//    val cellLs = HFileDumper.toKeyValue(sc, snapshotPath, restorePath,
-//      snapshotTableNames, columnFamily = "v")
-//
-//    val kvsLs = cellLs.map(CanGraphElement.cellsToSKeyValues).collect()
-//
-//    val elements = kvsLs.flatMap { kvs =>
-//      CanGraphElement.sKeyValueToGraphElement(s2)(kvs)
-//    }
-//
-//    elements.foreach { element =>
-//      val v = element.asInstanceOf[S2VertexLike]
-//      val json = Json.prettyPrint(PostProcess.s2VertexToJson(v).get)
-//
-//      println(json)
-//    }
-//
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index c382813..991897b 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.{PostProcess, S2VertexLike}
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
 import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader}
@@ -27,6 +27,8 @@ import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.rdd.RDD
 import play.api.libs.json.Json
 
+import scala.io.Source
+
 class GraphFileGeneratorTest extends BaseSparkTest {
 
   import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
@@ -116,106 +118,100 @@ class GraphFileGeneratorTest extends BaseSparkTest {
 
     bulkEdge shouldBe (indexEdge)
   }
-//  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
-//    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
-//    /* end of initialize model */
-//
-//    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
-//
-//    val transformerMode = "local"
-//    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
-//
-//    val serDe = s2.defaultStorage.serDe
-//
-//    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get
-//
-//    val indexEdges = ls.flatMap { kv =>
-//      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None)
-//    }
-//
-//    val indexEdge = indexEdges.head
-//
-//    println(indexEdge)
-//    println(bulkEdge)
-//
-//    bulkEdge shouldBe (indexEdge)
-//  }
-//
-//  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
-//    val serviceColumn = initTestVertexSchema(s2)
-//    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
-//
-//    val transformerMode = "spark"
-//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
-//
-//    val serDe = s2.defaultStorage.serDe
-//
-//    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
-//
-//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-//      println(Json.prettyPrint(jsValue))
-//    }
-//
-//    bulkVertex shouldBe (vertex)
-//  }
-//
-//  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
-//    val serviceColumn = initTestVertexSchema(s2)
-//    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
+  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+    /* end of initialize model */
+
+    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+
+    val transformerMode = "local"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
+
+    val serDe = s2.defaultStorage.serDe
+
+    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get
+
+    val indexEdges = ls.flatMap { kv =>
+      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None)
+    }
+
+    val indexEdge = indexEdges.head
+
+    println(indexEdge)
+    println(bulkEdge)
+
+    bulkEdge shouldBe (indexEdge)
+  }
+
+  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
+    val serviceColumn = initTestVertexSchema(s2)
+    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
+
+    val transformerMode = "spark"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+
+    val serDe = s2.defaultStorage.serDe
+
+    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
+
+    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+      println(Json.prettyPrint(jsValue))
+    }
+
+    bulkVertex shouldBe (vertex)
+  }
+
+  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
+    val serviceColumn = initTestVertexSchema(s2)
+    val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
+
+    val transformerMode = "local"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+
+    val serDe = s2.defaultStorage.serDe
+
+    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
+
+    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+      println(Json.prettyPrint(jsValue))
+    }
+
+    bulkVertex shouldBe (vertex)
+  }
+
+  //   this test case expect options.input already exist with valid bulk load format.
+    test("bulk load and fetch vertex: spark mode") {
+      import scala.collection.JavaConverters._
+      val serviceColumn = initTestVertexSchema(s2)
+
+      val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+      val input = sc.parallelize(bulkVertexLs)
+
+      HFileGenerator.generate(sc, s2Config, input, options)
+      HFileGenerator.loadIncrementHFile(options)
+
+      val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+      val json = PostProcess.verticesToJson(s2Vertices)
+
+      println(Json.prettyPrint(json))
+    }
+
+  //   this test case expect options.input already exist with valid bulk load format.
+//    test("bulk load and fetch vertex: mr mode") {
+//      import scala.collection.JavaConverters._
+//      val serviceColumn = initTestVertexSchema(s2)
 //
-//    val transformerMode = "local"
-//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+//      val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+//      val input = sc.parallelize(bulkVertexLs)
 //
-//    val serDe = s2.defaultStorage.serDe
+//      HFileMRGenerator.generate(sc, s2Config, input, options)
+//      HFileGenerator.loadIncrementHFile(options)
 //
-//    val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
+//      val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+//      val json = PostProcess.verticesToJson(s2Vertices)
 //
-//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-//      println(Json.prettyPrint(jsValue))
+//      println(Json.prettyPrint(json))
 //    }
-//
-//    bulkVertex shouldBe (vertex)
-//  }
-
-  //   this test case expect options.input already exist with valid bulk load format.
-  //  test("bulk load and fetch vertex: spark mode") {
-  //    import scala.collection.JavaConverters._
-  //    val serviceColumn = initTestVertexSchema(s2)
-  //
-  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-  //    val input = sc.parallelize(bulkVertexLs)
-  //
-  //    HFileGenerator.generate(sc, s2Config, input, options)
-  //
-  //    val hfileArgs = Array(options.output, options.tableName)
-  //    val hbaseConfig = HBaseConfiguration.create()
-  //
-  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-  //
-  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-  //    val json = PostProcess.verticesToJson(s2Vertices)
-  //
-  //    println(Json.prettyPrint(json))
-  //  }
-
-  //   this test case expect options.input already exist with valid bulk load format.
-  //  test("bulk load and fetch vertex: mr mode") {
-  //    val serviceColumn = initTestVertexSchema(s2)
-  //
-  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-  //    val input = sc.parallelize(bulkVertexLs)
-  //
-  //    HFileMRGenerator.generate(sc, s2Config, input, options)
-  //
-  //    val hfileArgs = Array(options.output, options.tableName)
-  //    val hbaseConfig = HBaseConfiguration.create()
-  //
-  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-  //    val json = PostProcess.verticesToJson(s2Vertices)
-  //
-  //    println(Json.prettyPrint(json))
-  //  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
new file mode 100644
index 0000000..a21b3df
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.s2graph.core.S2EdgeLike
+import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.spark.sql.DataFrame
+
+import scala.collection.JavaConverters._
+
+class SinkTest extends BaseSparkTest {
+  def toDataFrame(edges: Seq[String]): DataFrame = {
+    import spark.sqlContext.implicits._
+    val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+    elements.map { e =>
+      (e.getTs(),
+        e.getOperation(),
+        "e",
+        e.srcVertex.innerIdVal.toString,
+        e.tgtVertex.innerIdVal.toString,
+        e.label(),
+        "{}",
+        e.getDirection())
+    }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
+  }
+
+  test("S2graphSink writeBatch.") {
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+
+    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+    val df = toDataFrame(Seq(bulkEdgeString))
+    val args = options.toCommand.grouped(2).map { kv =>
+      kv.head -> kv.last
+    }.toMap
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2graphSink("testQuery", conf)
+    sink.write(df)
+
+    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+    s2Edges.foreach { edge => println(edge) }
+  }
+}


[10/11] incubator-s2graph git commit: add runLoadIncrementalHFiles option for S2graphSink.write.

Posted by st...@apache.org.
add runLoadIncrementalHFiles option for S2graphSink.write.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f4bd2842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f4bd2842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f4bd2842

Branch: refs/heads/master
Commit: f4bd2842f7c0bc0b3eb1a7f5467c25e38e4da999
Parents: f2857d1
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Apr 4 11:31:26 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Apr 4 11:31:26 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/loader/HFileGenerator.scala   | 2 +-
 .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala    | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f4bd2842/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index da190ee..e7535d4 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -125,7 +125,7 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
     HFileGenerator.generateHFile(sc, config, kvs, _options)
   }
 
-  def loadIncrementHFile(options: GraphFileOptions): Int = {
+  def loadIncrementalHFiles(options: GraphFileOptions): Int = {
     /* LoadIncrementHFiles */
     val hfileArgs = Array(options.output, options.tableName)
     val hbaseConfig = HBaseConfiguration.create()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f4bd2842/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 77e8cb1..866aa47 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -214,7 +214,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
 
   override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  private def writeBatchBulkload(df: DataFrame): Unit = {
+  private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = {
     val options = TaskConf.toGraphFileOptions(conf)
     val config = Management.toConfig(options.toConfigParams)
     val input = df.rdd
@@ -229,7 +229,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
     HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
 
     // finish bulk load by execute LoadIncrementHFile.
-    HFileGenerator.loadIncrementHFile(options)
+    if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options)
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
@@ -267,9 +267,10 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
     else {
       conf.options.getOrElse("writeMethod", "mutate") match {
         case "mutate" => writeBatchWithMutate(df)
-        case "bulk" => writeBatchBulkload(df)
+        case "bulk" =>
+          val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean
+          writeBatchBulkload(df, runLoadIncrementalHFiles)
         case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)")
-
       }
     }
   }


[02/11] incubator-s2graph git commit: simplify Transformer interface.

Posted by st...@apache.org.
simplify Transformer interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ae19dc11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ae19dc11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ae19dc11

Branch: refs/heads/master
Commit: ae19dc11c039c92deb92ea42fdb48b3bbe7bd6dd
Parents: b5535eb
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 2 17:38:09 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 2 17:38:09 2018 +0900

----------------------------------------------------------------------
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  6 ++
 .../s2jobs/loader/HFileMRGenerator.scala        |  6 ++
 .../loader/LocalBulkLoaderTransformer.scala     | 28 +++-----
 .../loader/SparkBulkLoaderTransformer.scala     | 49 ++++++-------
 .../SparkGraphElementLoaderTransformer.scala    | 75 --------------------
 .../s2jobs/serde/GraphElementWritable.scala     |  4 ++
 .../s2graph/s2jobs/serde/Transformer.scala      | 19 ++---
 .../s2jobs/serde/writer/KeyValueWriter.scala    |  6 +-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 55 ++++++++------
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 16 ++++-
 10 files changed, 107 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index b4ac51f..431631b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
@@ -113,6 +115,10 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
                         rdd: RDD[String],
                         _options: GraphFileOptions): Unit = {
     val transformer = new SparkBulkLoaderTransformer(config, _options)
+
+    implicit val reader = new TsvBulkFormatReader
+    implicit val writer = new KeyValueWriter
+
     val kvs = transformer.transform(rdd).flatMap(kvs => kvs)
 
     HFileGenerator.generateHFile(sc, config, kvs, _options)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index 3502bee..87968f0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -34,6 +34,8 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
 import org.apache.hadoop.mapreduce.{Job, Mapper}
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
@@ -105,6 +107,10 @@ object HFileMRGenerator extends RawFileGenerator[String, KeyValue] {
                input: RDD[String],
                options: GraphFileOptions): RDD[KeyValue] = {
     val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+    implicit val reader = new TsvBulkFormatReader
+    implicit val writer = new KeyValueWriter
+
     transformer.transform(input).flatMap(kvs => kvs)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
index 7d405a6..ad3483c 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -20,41 +20,31 @@
 package org.apache.s2graph.s2jobs.loader
 
 import com.typesafe.config.Config
-import org.apache.hadoop.hbase.KeyValue
 import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
 import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 
 import scala.concurrent.ExecutionContext
+import scala.reflect.ClassTag
 
 class LocalBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
+                                 val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] {
   val s2: S2Graph = S2GraphHelper.initS2Graph(config)
 
-  override val reader = new TsvBulkFormatReader
-  override val writer = new KeyValueWriter
-
-  override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_))
-
-  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_))
-
-  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = {
+  override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = {
     val degrees = elements.flatMap { element =>
       DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
     }.groupBy(_._1).mapValues(_.map(_._2).sum)
 
     degrees.toSeq.map { case (degreeKey, count) =>
-      DegreeKey.toKeyValue(s2, degreeKey, count)
+      writer.writeDegree(s2)(degreeKey, count)
     }
   }
 
-  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
+  override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = {
+    val elements = input.flatMap(reader.read(s2)(_))
+    val kvs = elements.map(writer.write(s2)(_))
+    val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil
 
     kvs ++ degrees
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index cd991e1..03d9784 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -20,35 +20,17 @@
 package org.apache.s2graph.s2jobs.loader
 
 import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
 import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
 import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 import org.apache.spark.rdd.RDD
 
-class SparkBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
-  val reader = new TsvBulkFormatReader
-
-  val writer = new KeyValueWriter
-
-  override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
+import scala.reflect.ClassTag
 
-    iter.flatMap { line =>
-      reader.read(s2)(line)
-    }
-  }
-
-  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.map(writer.write(s2)(_))
-  }
+class SparkBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions) extends Transformer[RDD] {
 
-  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
+  override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
     val degrees = elements.mapPartitions { iter =>
       val s2 = S2GraphHelper.initS2Graph(config)
 
@@ -61,16 +43,27 @@ class SparkBulkLoaderTransformer(val config: Config,
       val s2 = S2GraphHelper.initS2Graph(config)
 
       iter.map { case (degreeKey, count) =>
-        DegreeKey.toKeyValue(s2, degreeKey, count)
+        writer.writeDegree(s2)(degreeKey, count)
       }
     }
   }
 
-  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
+  override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
+    val elements = input.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { line =>
+        reader.read(s2)(line)
+      }
+    }
+
+    val kvs = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map(writer.write(s2)(_))
+    }
 
     if (options.buildDegree) kvs ++ buildDegrees(elements)
-    kvs
+    else kvs
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
deleted file mode 100644
index fcf8d4c..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader}
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-
-class SparkGraphElementLoaderTransformer(val config: Config,
-                                         val options: GraphFileOptions) extends Transformer[Row, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
-  val reader = new RowBulkFormatReader
-
-  val writer = new KeyValueWriter
-
-  override def read(input: RDD[Row]): RDD[GraphElement] = input.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.flatMap(reader.read(s2)(_))
-  }
-
-  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.map(writer.write(s2)(_))
-  }
-
-  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
-    val degrees = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.flatMap { element =>
-        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
-      }
-    }.reduceByKey(_ + _)
-
-    degrees.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.map { case (degreeKey, count) =>
-        DegreeKey.toKeyValue(s2, degreeKey, count)
-      }
-    }
-  }
-
-  override def transform(input: RDD[Row]): RDD[Seq[HKeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    if (options.buildDegree) kvs ++ buildDegrees(elements)
-    kvs
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
index ae082d8..f71a9e8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
@@ -20,7 +20,11 @@
 package org.apache.s2graph.s2jobs.serde
 
 import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
 
 trait GraphElementWritable[T] extends Serializable {
+
   def write(s2: S2Graph)(element: GraphElement): T
+
+  def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): T
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
index 3902c63..ef1bd29 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -23,28 +23,21 @@ import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphElement
 import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
+import scala.reflect.ClassTag
+
 /**
   * Define serialize/deserialize.
   * Source -> GraphElement
   * GraphElement -> Target
   *
-  * @tparam S : Source class. ex) String, RDF.Statement, ...
-  * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ...
   * @tparam M : Container type. ex) RDD, Seq, List, ...
   */
-trait Transformer[S, T, M[_]] extends Serializable {
+trait Transformer[M[_]] extends Serializable {
   val config: Config
   val options: GraphFileOptions
 
-  val reader: GraphElementReadable[S]
-
-  val writer: GraphElementWritable[T]
-
-  def read(input: M[S]): M[GraphElement]
-
-  def write(elements: M[GraphElement]): M[T]
-
-  def buildDegrees(elements: M[GraphElement]): M[T]
+  def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T]
 
-  def transform(input: M[S]): M[T]
+  def transform[S: ClassTag, T: ClassTag](input: M[S])
+                           (implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): M[T]
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
index 02034af..cc1f801 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.s2jobs.serde.writer
 
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 import org.apache.s2graph.s2jobs.serde.GraphElementWritable
 
 class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] {
@@ -30,4 +30,8 @@ class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritab
       new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
     }
   }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Seq[KeyValue] = {
+    DegreeKey.toKeyValue(s2, degreeKey, count)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index b7a91d9..bc67822 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,29 +20,30 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.Config
-import org.apache.s2graph.core.{GraphElement, Management}
+import org.apache.s2graph.core.Management
 import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer, SparkGraphElementLoaderTransformer}
-import org.apache.s2graph.s2jobs.serde.GraphElementReadable
-import org.apache.spark.rdd.RDD
+import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
 /**
   * Sink
+  *
   * @param queryName
   * @param conf
   */
-abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
+abstract class Sink(queryName: String, override val conf: TaskConf) extends Task {
   val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}/${conf.name}"
   val DEFAULT_TRIGGER_INTERVAL = "10 seconds"
 
-  val FORMAT:String
+  val FORMAT: String
 
-  def preprocess(df:DataFrame):DataFrame = df
+  def preprocess(df: DataFrame): DataFrame = df
 
-  def write(inputDF: DataFrame):Unit = {
+  def write(inputDF: DataFrame): Unit = {
     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
@@ -56,7 +57,7 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
       case "update" => OutputMode.Update()
       case "complete" => OutputMode.Complete()
       case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use default output mode 'append'")
-                OutputMode.Append()
+        OutputMode.Append()
     }
     val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
     val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION)
@@ -94,9 +95,9 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
     writer.save(outputPath)
   }
 
-  protected def repartition(df:DataFrame, defaultParallelism:Int) = {
+  protected def repartition(df: DataFrame, defaultParallelism: Int) = {
     conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
-      case Some(numOfPartitions:Int) =>
+      case Some(numOfPartitions: Int) =>
         if (numOfPartitions > defaultParallelism) df.repartition(numOfPartitions)
         else df.coalesce(numOfPartitions)
       case None => df
@@ -106,14 +107,16 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
 
 /**
   * KafkaSink
+  *
   * @param queryName
   * @param conf
   */
-class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class KafkaSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "topic")
+
   override val FORMAT: String = "kafka"
 
-  override def preprocess(df:DataFrame):DataFrame = {
+  override def preprocess(df: DataFrame): DataFrame = {
     import org.apache.spark.sql.functions._
 
     logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
@@ -124,7 +127,7 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
         val columns = df.columns
         df.select(concat_ws(delimiter, columns.map(c => col(c)): _*).alias("value"))
-      case format:String =>
+      case format: String =>
         if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format '$format'. use default json format")
         df.selectExpr("to_json(struct(*)) AS value")
     }
@@ -136,21 +139,25 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
 /**
   * FileSink
+  *
   * @param queryName
   * @param conf
   */
-class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class FileSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("path", "format")
+
   override val FORMAT: String = conf.options.getOrElse("format", "parquet")
 }
 
 /**
   * HiveSink
+  *
   * @param queryName
   * @param conf
   */
-class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class HiveSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("database", "table")
+
   override val FORMAT: String = "hive"
 
   override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = {
@@ -167,11 +174,13 @@ class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
 /**
   * ESSink
+  *
   * @param queryName
   * @param conf
   */
-class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("es.nodes", "path", "es.port")
+
   override val FORMAT: String = "es"
 
   override def write(inputDF: DataFrame): Unit = {
@@ -188,16 +197,18 @@ class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
 /**
   * S2graphSink
+  *
   * @param queryName
   * @param conf
   */
-class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set()
+
   override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
   private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction")
 
-  override def write(inputDF: DataFrame):Unit = {
+  override def write(inputDF: DataFrame): Unit = {
     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
@@ -206,7 +217,11 @@ class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf)
       val bulkLoadOptions: GraphFileOptions = S2GraphHelper.toGraphFileOptions(conf)
       val input = df.rdd
 
-      val transformer = new SparkGraphElementLoaderTransformer(config, bulkLoadOptions)
+      val transformer = new SparkBulkLoaderTransformer(config, bulkLoadOptions)
+
+      implicit val reader = new RowBulkFormatReader
+      implicit val writer = new KeyValueWriter
+
       val kvs = transformer.transform(input)
 
       HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), bulkLoadOptions)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index baf9b32..c382813 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -22,6 +22,8 @@ package org.apache.s2graph.s2jobs.loader
 import org.apache.s2graph.core.PostProcess
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader}
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.rdd.RDD
 import play.api.libs.json.Json
 
@@ -36,6 +38,10 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       case "spark" =>
         val input: RDD[String] = sc.parallelize(edges)
         val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new TsvBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
           kvs.map { kv =>
@@ -46,6 +52,10 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       case "local" =>
         val input = edges
         val transformer = new LocalBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new TsvBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
           kvs.map { kv =>
@@ -68,7 +78,11 @@ class GraphFileGeneratorTest extends BaseSparkTest {
             e.getDirection())
         }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd
 
-        val transformer = new SparkGraphElementLoaderTransformer(s2Config, options)
+        val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new RowBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(rows)
         kvs.flatMap { kvs =>
           kvs.map { kv =>


[08/11] incubator-s2graph git commit: merge writeBatch for S2GraphSink

Posted by st...@apache.org.
merge writeBatch for S2GraphSink


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/fc9fde7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/fc9fde7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/fc9fde7f

Branch: refs/heads/master
Commit: fc9fde7fbde1bcc18f2dec41a54ec70af01cd80c
Parents: c47359c c129ed0
Author: Chul Kang <el...@apache.org>
Authored: Tue Apr 3 18:47:13 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Tue Apr 3 18:47:13 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  9 -----
 .../s2jobs/loader/GraphFileOptions.scala        |  6 ++++
 .../s2graph/s2jobs/serde/Transformer.scala      |  2 +-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 35 ++++++++++----------
 .../org/apache/s2graph/s2jobs/task/Task.scala   |  9 +++++
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |  2 --
 .../apache/s2graph/s2jobs/task/SinkTest.scala   | 31 ++++++++++++-----
 7 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 20c1558,0dce19a..77e8cb1
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@@ -214,21 -209,8 +214,8 @@@ class S2graphSink(queryName: String, co
  
    override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
  
-   override def write(inputDF: DataFrame): Unit = {
-     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
- 
-     if (inputDF.isStreaming) writeStream(df.writeStream)
-     else {
-       conf.options.getOrElse("writeMethod", "mutate") match {
-         case "bulk" => writeBatchWithBulkload(df)
-         case "mutate" => writeBatchWithMutate(df)
-       }
- 
-     }
-   }
- 
-   private def writeBatchWithBulkload(df:DataFrame):Unit = {
-     val options = S2GraphHelper.toGraphFileOptions(conf)
 -  private def bulkload(df: DataFrame): Unit = {
++  private def writeBatchBulkload(df: DataFrame): Unit = {
+     val options = TaskConf.toGraphFileOptions(conf)
      val config = Management.toConfig(options.toConfigParams)
      val input = df.rdd
  
@@@ -245,32 -227,13 +232,46 @@@
      HFileGenerator.loadIncrementHFile(options)
    }
  
 +  private def writeBatchWithMutate(df:DataFrame):Unit = {
 +    import scala.collection.JavaConversions._
 +    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
 +
-     val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
++    val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
 +    val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
 +
 +    val reader = new RowBulkFormatReader
 +
 +    val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
 +    val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
 +
-     df.foreachPartition{ iters =>
++    df.foreachPartition { iters =>
 +      val config = ConfigFactory.parseString(serializedConfig)
 +      val s2Graph = S2GraphHelper.initS2Graph(config)
 +
 +      val responses = iters.grouped(groupedSize).flatMap { rows =>
 +        val elements = rows.flatMap(row => reader.read(s2Graph)(row))
 +
 +        val mutateF = s2Graph.mutateElements(elements, true)
 +        Await.result(mutateF, Duration(waitTime, "seconds"))
 +      }
 +
 +      val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
 +      logger.info(s"success : ${success.size}, fail : ${fail.size}")
 +    }
 +  }
++
+   override def write(inputDF: DataFrame): Unit = {
+     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+ 
+     if (inputDF.isStreaming) writeStream(df.writeStream)
+     else {
 -      bulkload(df)
++      conf.options.getOrElse("writeMethod", "mutate") match {
++        case "mutate" => writeBatchWithMutate(df)
++        case "bulk" => writeBatchBulkload(df)
++        case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)")
++
++      }
+     }
+   }
  }
  

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 4f02808,4f02808..0461d1e
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@@ -33,8 -33,8 +33,6 @@@ import org.scalatest.{BeforeAndAfterAll
  import scala.util.Try
  
  class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
--  private val master = "local[2]"
--  private val appName = "example-spark"
  
    protected val options = GraphFileOptions(
      input = "/tmp/test.txt",

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index db96328,a21b3df..02b724e
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@@ -26,6 -26,6 +26,10 @@@ import org.apache.spark.sql.DataFram
  import scala.collection.JavaConverters._
  
  class SinkTest extends BaseSparkTest {
++  override def beforeAll(): Unit = {
++    super.beforeAll()
++    initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
++  }
    def toDataFrame(edges: Seq[String]): DataFrame = {
      import spark.sqlContext.implicits._
      val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
@@@ -42,13 -42,12 +46,11 @@@
      }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
    }
  
 -  test("S2graphSink writeBatch.") {
 -    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
 -
 +  test("S2graphSink writeBatchWithBulkload") {
-     initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
- 
      val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
      val df = toDataFrame(Seq(bulkEdgeString))
 -    val args = options.toCommand.grouped(2).map { kv =>
 +    val args = Map("writeMethod" -> "bulk") ++
 +      options.toCommand.grouped(2).map { kv =>
        kv.head -> kv.last
      }.toMap
  
@@@ -59,25 -58,5 +61,38 @@@
  
      val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
      s2Edges.foreach { edge => println(edge) }
++
++    val filteredEdges = s2Edges.filter{ edge =>
++      edge.srcVertex.innerIdVal.toString == "a" &&
++        edge.tgtVertex.innerIdVal.toString == "b" &&
++        edge.label() == "friends"
++    }
++
++    assert(filteredEdges.size == 1)
 +  }
 +
 +  test("S2graphSink writeBatchWithMutate") {
-     initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
- 
-     val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":20}"
++    val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}"
 +    val df = toDataFrame(Seq(bulkEdgeString))
 +    val args = Map("writeMethod" -> "mutate") ++
-       options.toCommand.grouped(2).map { kv =>
-         kv.head -> kv.last
-       }.toMap
++      options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap
 +
 +    val conf = TaskConf("test", "sql", Seq("input"), args)
 +
 +    val sink = new S2graphSink("testQuery", conf)
 +    sink.write(df)
 +
 +    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
 +    s2Edges.foreach { edge => println(edge) }
++
++    val filteredEdges = s2Edges.filter{ edge =>
++      edge.srcVertex.innerIdVal.toString == "b" &&
++      edge.tgtVertex.innerIdVal.toString == "c" &&
++      edge.getTs() == 1416236400000L &&
++      edge.label() == "friends"
++    }
++
++    assert(filteredEdges.size == 1)
    }
 +
  }


[11/11] incubator-s2graph git commit: [S2GRAPH-197]: Provide S2graphSink for non-streaming dataset

Posted by st...@apache.org.
[S2GRAPH-197]: Provide S2graphSink for non-streaming dataset

JIRA:
    [S2GRAPH-197] https://issues.apache.org/jira/browse/S2GRAPH-197

Pull Request:
    Closes #197

Author
    DO YUNG YOON <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1799ae45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1799ae45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1799ae45

Branch: refs/heads/master
Commit: 1799ae4561054873c1050964d3fbcedcf16a6190
Parents: f4bd284
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Apr 4 11:44:18 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Apr 4 11:44:18 2018 +0900

----------------------------------------------------------------------
 CHANGES | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1799ae45/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 39360ba..077f641 100644
--- a/CHANGES
+++ b/CHANGES
@@ -34,6 +34,7 @@ Release Notes - S2Graph - Version 0.2.0
     * [S2GRAPH-161] - Update CHANGES file with correct release version
     * [S2GRAPH-163] - Update version.sbt after release
     * [S2GRAPH-180] - Implement missing Management API
+    * [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset
 
 ** Bug
     * [S2GRAPH-159] - Wrong syntax at a bash script under Linux


[04/11] incubator-s2graph git commit: separate bulkload from write.

Posted by st...@apache.org.
separate bulkload from write.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1ac70d54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1ac70d54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1ac70d54

Branch: refs/heads/master
Commit: 1ac70d540410242f2b8cff5e2664a519d1e4affd
Parents: 86dcc11
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue Apr 3 10:06:04 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Tue Apr 3 10:06:04 2018 +0900

----------------------------------------------------------------------
 .../s2graph/s2jobs/serde/Transformer.scala      |  2 +-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 32 +++++++++++---------
 2 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1ac70d54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
index ef1bd29..a448d3f 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -34,7 +34,7 @@ import scala.reflect.ClassTag
   */
 trait Transformer[M[_]] extends Serializable {
   val config: Config
-  val options: GraphFileOptions
+//  val options: GraphFileOptions
 
   def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T]
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1ac70d54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 7c4c857..8c57539 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -209,26 +209,30 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
 
   override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  override def write(inputDF: DataFrame): Unit = {
-    val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+  private def bulkload(df: DataFrame): Unit = {
+    val options = S2GraphHelper.toGraphFileOptions(conf)
+    val config = Management.toConfig(options.toConfigParams)
+    val input = df.rdd
 
-    if (inputDF.isStreaming) writeStream(df.writeStream)
-    else {
-      val options = S2GraphHelper.toGraphFileOptions(conf)
-      val config = Management.toConfig(options.toConfigParams)
-      val input = df.rdd
+    val transformer = new SparkBulkLoaderTransformer(config, options)
+
+    implicit val reader = new RowBulkFormatReader
+    implicit val writer = new KeyValueWriter
 
-      val transformer = new SparkBulkLoaderTransformer(config, options)
+    val kvs = transformer.transform(input)
 
-      implicit val reader = new RowBulkFormatReader
-      implicit val writer = new KeyValueWriter
+    HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
 
-      val kvs = transformer.transform(input)
+    // finish bulk load by execute LoadIncrementHFile.
+    HFileGenerator.loadIncrementHFile(options)
+  }
 
-      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+  override def write(inputDF: DataFrame): Unit = {
+    val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
 
-      // finish bulk load by execute LoadIncrementHFile.
-      HFileGenerator.loadIncrementHFile(options)
+    if (inputDF.isStreaming) writeStream(df.writeStream)
+    else {
+      bulkload(df)
     }
   }
 }


[05/11] incubator-s2graph git commit: add writeBatchWithMutate on S2GraphSink

Posted by st...@apache.org.
add writeBatchWithMutate on S2GraphSink


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7ad721cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7ad721cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7ad721cf

Branch: refs/heads/master
Commit: 7ad721cfa5f87916cf445eabb08f14d8490a95c7
Parents: 86dcc11
Author: Chul Kang <el...@apache.org>
Authored: Tue Apr 3 14:52:50 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Tue Apr 3 14:52:50 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 63 ++++++++++++++++----
 1 file changed, 52 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ad721cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 7c4c857..20c1558 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.s2jobs.task
 
-import com.typesafe.config.Config
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 import org.apache.hadoop.util.ToolRunner
@@ -28,10 +28,15 @@ import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 /**
   * Sink
   *
@@ -214,21 +219,57 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
     else {
-      val options = S2GraphHelper.toGraphFileOptions(conf)
-      val config = Management.toConfig(options.toConfigParams)
-      val input = df.rdd
+      conf.options.getOrElse("writeMethod", "mutate") match {
+        case "bulk" => writeBatchWithBulkload(df)
+        case "mutate" => writeBatchWithMutate(df)
+      }
+
+    }
+  }
+
+  private def writeBatchWithBulkload(df:DataFrame):Unit = {
+    val options = S2GraphHelper.toGraphFileOptions(conf)
+    val config = Management.toConfig(options.toConfigParams)
+    val input = df.rdd
+
+    val transformer = new SparkBulkLoaderTransformer(config, options)
+
+    implicit val reader = new RowBulkFormatReader
+    implicit val writer = new KeyValueWriter
+
+    val kvs = transformer.transform(input)
+
+    HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+
+    // finish bulk load by execute LoadIncrementHFile.
+    HFileGenerator.loadIncrementHFile(options)
+  }
+
+  private def writeBatchWithMutate(df:DataFrame):Unit = {
+    import scala.collection.JavaConversions._
+    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+    val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+    val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
+
+    val reader = new RowBulkFormatReader
 
-      val transformer = new SparkBulkLoaderTransformer(config, options)
+    val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
+    val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
 
-      implicit val reader = new RowBulkFormatReader
-      implicit val writer = new KeyValueWriter
+    df.foreachPartition{ iters =>
+      val config = ConfigFactory.parseString(serializedConfig)
+      val s2Graph = S2GraphHelper.initS2Graph(config)
 
-      val kvs = transformer.transform(input)
+      val responses = iters.grouped(groupedSize).flatMap { rows =>
+        val elements = rows.flatMap(row => reader.read(s2Graph)(row))
 
-      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+        val mutateF = s2Graph.mutateElements(elements, true)
+        Await.result(mutateF, Duration(waitTime, "seconds"))
+      }
 
-      // finish bulk load by execute LoadIncrementHFile.
-      HFileGenerator.loadIncrementHFile(options)
+      val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
+      logger.info(s"success : ${success.size}, fail : ${fail.size}")
     }
   }
 }


[07/11] incubator-s2graph git commit: filter OptionKeys on GraphFileOptions.

Posted by st...@apache.org.
filter OptionKeys on GraphFileOptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c129ed01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c129ed01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c129ed01

Branch: refs/heads/master
Commit: c129ed01fed984faebff3eff640d4c1eab249c98
Parents: 1ac70d5
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue Apr 3 15:41:13 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Tue Apr 3 15:41:13 2018 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala     | 9 ---------
 .../org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala | 6 ++++++
 .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala    | 2 +-
 .../main/scala/org/apache/s2graph/s2jobs/task/Task.scala    | 9 +++++++++
 4 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 69b3716..6e68d28 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,10 +24,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import org.apache.s2graph.s2jobs.task.TaskConf
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.types.StructType
 import play.api.libs.json.{JsObject, Json}
 
@@ -85,12 +82,6 @@ object S2GraphHelper {
     }
   }
 
-  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
-    val args = taskConf.options.flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
-
-    GraphFileOptions.toOption(args)
-  }
-
   def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = {
     val timestamp = row.getAs[Long]("timestamp")
     val operation = Try(row.getAs[String]("operation")).getOrElse("insert")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index 4bf8379..8aa540c 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -20,6 +20,12 @@
 package org.apache.s2graph.s2jobs.loader
 
 object GraphFileOptions {
+  val OptionKeys = Set(
+    "--input", "--tempDir", "--output", "--zkQuorum", "--table", "--dbUrl", "--dbUser", "--dbPassword", "--dbDriver",
+    "--maxHFilePerRegionServer", "--numRegions", "--labelMapping", "--autoEdgeCreate", "--buildDegree", "--incrementalLoad",
+    "--method"
+  )
+
   val parser = new scopt.OptionParser[GraphFileOptions]("run") {
 
     opt[String]('i', "input").required().action( (x, c) =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 8c57539..0dce19a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -210,7 +210,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
   override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
   private def bulkload(df: DataFrame): Unit = {
-    val options = S2GraphHelper.toGraphFileOptions(conf)
+    val options = TaskConf.toGraphFileOptions(conf)
     val config = Management.toConfig(options.toConfigParams)
     val input = df.rdd
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index e8f11e3..ddd56bf 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -20,7 +20,16 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.s2graph.s2jobs.Logger
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
+object TaskConf {
+  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+    val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
+      .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+
+    GraphFileOptions.toOption(args)
+  }
+}
 case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty)
 
 trait Task extends Serializable with Logger {


[06/11] incubator-s2graph git commit: modify tc

Posted by st...@apache.org.
modify tc


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c47359c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c47359c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c47359c1

Branch: refs/heads/master
Commit: c47359c12796521106884b09d464099a64924f5d
Parents: 7ad721c
Author: Chul Kang <el...@apache.org>
Authored: Tue Apr 3 14:54:27 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Tue Apr 3 14:54:27 2018 +0900

----------------------------------------------------------------------
 .../s2graph/s2jobs/S2GraphHelperTest.scala      | 13 +++++++++-
 .../apache/s2graph/s2jobs/task/SinkTest.scala   | 27 +++++++++++++++++---
 2 files changed, 36 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c47359c1/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
index 6e89466..f2b0102 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -19,6 +19,17 @@
 
 package org.apache.s2graph.s2jobs
 
-class S2GraphHelperTest {
+import org.apache.s2graph.s2jobs.task.TaskConf
 
+class S2GraphHelperTest extends BaseSparkTest {
+  test("toGraphFileOptions") {
+    val args = options.toCommand.grouped(2).map { kv =>
+          kv.head -> kv.last
+      }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
+
+    println(args)
+    val taskConf = TaskConf("dummy", "sink", Nil, args)
+    val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf)
+    println(graphFileOptions)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c47359c1/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index a21b3df..db96328 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -42,12 +42,13 @@ class SinkTest extends BaseSparkTest {
     }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
   }
 
-  test("S2graphSink writeBatch.") {
-    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+  test("S2graphSink writeBatchWithBulkload") {
+    initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
 
     val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
     val df = toDataFrame(Seq(bulkEdgeString))
-    val args = options.toCommand.grouped(2).map { kv =>
+    val args = Map("writeMethod" -> "bulk") ++
+      options.toCommand.grouped(2).map { kv =>
       kv.head -> kv.last
     }.toMap
 
@@ -59,4 +60,24 @@ class SinkTest extends BaseSparkTest {
     val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
     s2Edges.foreach { edge => println(edge) }
   }
+
+  test("S2graphSink writeBatchWithMutate") {
+    initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+
+    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":20}"
+    val df = toDataFrame(Seq(bulkEdgeString))
+    val args = Map("writeMethod" -> "mutate") ++
+      options.toCommand.grouped(2).map { kv =>
+        kv.head -> kv.last
+      }.toMap
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2graphSink("testQuery", conf)
+    sink.write(df)
+
+    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+    s2Edges.foreach { edge => println(edge) }
+  }
+
 }


[09/11] incubator-s2graph git commit: Merge pull request #14 from elric-k/S2GRAPH-197

Posted by st...@apache.org.
Merge pull request #14 from elric-k/S2GRAPH-197

[S2GRAPH-197] add writeBatchWithMutate on S2GraphSink

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f2857d1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f2857d1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f2857d1f

Branch: refs/heads/master
Commit: f2857d1f2cd5a6615e10885b7e18a45bc40b4e8e
Parents: c129ed0 fc9fde7
Author: Doyung Yoon <st...@apache.org>
Authored: Wed Apr 4 06:53:04 2018 +0900
Committer: GitHub <no...@github.com>
Committed: Wed Apr 4 06:53:04 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 44 ++++++++++++++++++--
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |  2 -
 .../s2graph/s2jobs/S2GraphHelperTest.scala      | 13 +++++-
 .../apache/s2graph/s2jobs/task/SinkTest.scala   | 44 ++++++++++++++++++--
 4 files changed, 93 insertions(+), 10 deletions(-)
----------------------------------------------------------------------